Grouping and aggregation using Java streams

When we group the elements in the list, we can then aggregate the fields of the grouped elements to perform meaningful operations to help us analyze the data. Some examples are addition, average, or maximum / minimum. Aggregation of these individual fields can be easily done using Java Streams and Collectors. This document provides simple examples of how to perform these types of calculations.

However, there are more complex aggregations, such as weighted average and geometric average. In addition, you may need to aggregate multiple fields at the same time. In this article, we will show a direct way to solve such problems using Java Streams. Using this framework enables us to process a large amount of data quickly and effectively.

We assume that readers are right Java Streams And utilities Collectors Class has a basic understanding.

Problem layout

Let's consider a simple example to show the type of problem we want to solve. We will make it very generic so that we can easily summarize it. Let's consider TaxEntry's list of entities defined by the following code:

public class TaxEntry {

    private String state;
    private String city;
    private int numEntries;
    private double price;
    //Constructors, getters, hashCode, equals etc
}

Calculating the total number of entries for a given city is very simple:

Map<String, Integer> totalNumEntriesByCity = 
              taxes.stream().collect(Collectors.groupingBy(TaxEntry::getCity, 
                                                           Collectors.summingInt(TaxEntry::getNumEntries)));

Collectors.groupingBy accepts two parameters: a classifier function for grouping and a Collector for downstream aggregation of all elements belonging to a given group. We use TaxEntry::getCity as the classifier function. For downstream, we use Collectors::summingIntwhich to return a Collector sum and the number of tax entries we get for each grouping element.  

If we try to find composite groups, things will be a little more complicated. For example, for the previous example, give the total number of entries for Dingzhou and {city. There are several ways to do this, but a very simple way is to define it first:

record StateCityGroup(String state, String city) {}

Note that we are using Java} record, which is a concise way to define immutable classes. In addition, the Java compiler generates the implementation of the field accessor methods hashCode,, equal sign and toString for us. With this, the current solution is simple:

Map<StateCityGroup, Integer> totalNumEntriesForStateCity = 
                    taxes.stream().collect(groupingBy(p -> new StateCityGroup(p.getState(), p.getCity()), 
                                                      Collectors.summingInt(TaxEntrySimple::getNumEntries))
                                          );

Because Collectors::groupingBy, we use a lambda expression to set the classifier function, which creates a StateCityGroup to encapsulate new records for each state city. The downstream Collector is the same as before.

Note: for brevity, in the code example, we will assume that all methods of the Collectors class are statically imported, so we don't have to show their class qualifications.

If we want to do multiple aggregations at the same time, things start to get more complicated. For example, find the sum of the number of items and the average price for a given state and city. The library does not provide a simple solution to this problem.  

To begin to solve this problem, we get clues from the previous aggregation and define a record to encapsulate all fields that need aggregation:

record TaxEntryAggregation (int totalNumEntries, double averagePrice ) {}

Now, how can we aggregate two fields at the same time? As suggested in the following code, it is always possible to do two stream collections to find each aggregate separately:

Map<StateCityGroup, TaxEntryAggregation> aggregationByStateCity = taxes.stream().collect(
           groupingBy(p -> new StateCityGroup(p.getState(), p.getCity()),
                      collectingAndThen(Collectors.toList(), 
                                        list -> {int entries = list.stream().collect(
                                                                   summingInt(TaxEntrySimple::getNumEntries));
                                                 double priceAverage = list.stream().collect(
                                                                   averagingDouble(TaxEntrySimple::getPrice));
                                                 return new TaxEntryAggregation(entries, priceAverage);})));

Grouping is done as before, but for downstream, we use Collectors::collectingAndThen (line 3) for aggregation. This function has two parameters:

  • The download stream that we convert to the initial grouping of the list (Collectors::toList() used in line 3)
  • Finisher function (lines 4-9), we use lambda expression to create two different streams from the previous list for aggregation and combine them to return in a new TaxEntryAggregation record

Imagine that we want to do more field aggregation at the same time. We need to increase the number of streams in the downstream list accordingly. Code becomes inefficient, highly repetitive, and less than ideal. We should look for better alternatives.

In addition, the problem is more than that. In general, we are limited to the types of aggregation that can be performed using the Collectors helper class. Their methods summing *, averaging * and summarizing * only support integer, long integer, and double precision native types. If we have more complex types, such as BigInteger, what should we do BigDecimal?  

To make matters worse, the summarizing * method only provides summary statistics for min, max, count, sum, and average. What if we want to perform more complex calculations, such as weighted average or geometric average?

Some people will argue that we can always write custom Collectors, but this requires an understanding of the collector interface and a good understanding of the flow collector process. Using the built-in Collectors provided by the utility methods in the Collectors class is more straightforward. In the next section, we will show some strategies on how to achieve this goal.

Complex multiple aggregation: solution path

Let's consider a simple example that will highlight the challenges we mentioned in the previous section. Suppose we have the following entities

public class TaxEntry {
    private String state;
    private String city;
    private BigDecimal rate;
    private BigDecimal price;
    record StateCityGroup(String state, String city) {
    }
    //Constructors, getters, hashCode/equals etc
}

We first ask each different state city pair how to find the total number of entries and the sum of the product of rate and price(∑ (rate * price)). Please note that we are using BigDecimal

As we did in the previous section, we defined a class that encapsulates aggregation:

record RatePriceAggregation(int count, BigDecimal ratePrice) {}

It may seem surprising at first, but the direct solution to grouping followed by simple aggregation is to use Collectors::toMap Let's see how we can do this:

Map<StateCityGroup, RatePriceAggregation> mapAggregation = taxes.stream().collect(
      toMap(p -> new StateCityGroup(p.getState(), p.getCity()), 
            p -> new RatePriceAggregation(1, p.getRate().multiply(p.getPrice())), 
            (u1,u2) -> new RatePriceAggregation( u1.count() + u2.count(), u1.ratePrice().add(u2.ratePrice()))
            ));

(line Collectors::toMap2) accept three parameters, and we implement the following implementation:

  • The first parameter is a lambda expression used to generate the key of the map. This function creates a StateCityGroup as the key of the map. This groups the elements by state and city (line 2).
  • The second parameter produces the value of the map. In our example, we created RatePriceAggregation, an initialization with a count of 1 and the product of rate and price (line 3).
  • Finally, the last parameter is the case where BinaryOperator is used to merge multiple elements mapped to the same state city key. We add the count and price to summarize (line 4).

Let's show you how to set up some sample data:

List<TaxEntry> taxes = Arrays.asList(
                          new TaxEntry("New York", "NYC", BigDecimal.valueOf(0.2), BigDecimal.valueOf(20.0)), 
                          new TaxEntry("New York", "NYC", BigDecimal.valueOf(0.4), BigDecimal.valueOf(10.0)), 
                          new TaxEntry("New York", "NYC", BigDecimal.valueOf(0.6), BigDecimal.valueOf(10.0)), 
                          new TaxEntry("Florida", "Orlando", BigDecimal.valueOf(0.3), BigDecimal.valueOf(13.0)));

Getting the result from the previous code example is simple:

System.out.println("New York: " + mapAggregation.get(new StateCityGroup("New York", "NYC")));

This print:

New York: RatePriceAggregation[count=3, ratePrice=14.00]

This is a direct implementation that determines the grouping and aggregation of multiple fields and non original data types (BigDecimal in our example). However, its disadvantage is that it does not have any finalizers that allow you to perform additional operations. For example, you can't do any type of average.

To illustrate this problem, let's consider a more complex problem. Suppose we want to find the weighted average of rate price and the sum of all prices for each state and city pair. In particular, to find the weighted average, we need to calculate the sum of the product of the rate and price of all items belonging to each state city pair, and then divide it by the total number of items in each case, n: 1/n Σ (rate * price).

To solve this problem, we begin to define a record containing aggregation:

record TaxEntryAggregation(int count, BigDecimal weightedAveragePrice, BigDecimal totalPrice) {}

With this, we can implement the following:

Map<StateCityGroup, TaxEntryAggregation> groupByAggregation = taxes.stream().collect(
    groupingBy(p -> new StateCityGroup(p.getState(), p.getCity()), 
               mapping(p -> new TaxEntryAggregation(1, p.getRate().multiply(p.getPrice()), p.getPrice()), 
                       collectingAndThen(reducing(new TaxEntryAggregation(0, BigDecimal.ZERO, BigDecimal.ZERO),
                                                  (u1,u2) -> new TaxEntryAggregation(u1.count() + u2.count(),
                                                      u1.weightedAveragePrice().add(u2.weightedAveragePrice()), 
                                                      u1.totalPrice().add(u2.totalPrice()))
                                                  ),
                                         u -> new TaxEntryAggregation(u.count(), 
                                                 u.weightedAveragePrice().divide(BigDecimal.valueOf(u.count()),
                                                                                 2, RoundingMode.HALF_DOWN), 
                                                 u.totalPrice())
                                         )
                      )
              ));

We can see that the code is slightly more complex, but it can give us the solution we are looking for. We will focus on it in more detail:

We see that this implementation not only allows us to aggregate multiple fields at the same time, but also can perform complex calculations in multiple stages.  

This can be easily extended to solve more complex problems. The path is simple: define a record, encapsulate all fields that need to be aggregated, and Collectors::mapping is used to initialize the record, and then apply for Collectors::collectingAndThen for reduction and final aggregation.  

As before, we can get the aggregation of New York:

System.out.println("Finished aggregation: " + groupByAggregation.get(new StateCityGroup("New York", "NYC")));

We get the following results:

Finished aggregation: TaxEntryAggregation[count=3, weightedAveragePrice=4.67, totalPrice=40.0]

It is also worth noting that since TaxEntryAggregation is a Java record, it is immutable, so you can use the stream collector library to support parallel computing.

conclusion

We have shown several strategies to use aggregations for complex multi field grouping, including non original data types with multi field and cross field calculations. This is a list of records using Java streams and Collectors API, so it provides us with the ability to process large amounts of data quickly and effectively.

    • Collectors::groupingBy (line 2):
      1. For Citygroup, we create a record classification function
      2. For downstream, we call Collectors::mapping (line 3):
        • For the first parameter, the mapper we applied to the input element converts the grouped state city tax records to taxentryagegration, assigns a new entry with an initial count of 1, multiplies the tax rate by the price, and then sets the price (line 3).
        • For the downstream, we call Collectors::collectingAndThen (line 4), which, as we will see, will allow us to apply a completion transformation to the downstream collector.
          • Call Collectors::reducing (line 4)
            1. Create a default value, TaxEntryAggregation, to cover cases where there are no downstream elements (line 4).
            2. Lambda expression reduces and returns a new expression with TaxEntryAggregation containing field aggregation (lines 5, 6, and 7)
          • Complete the conversion using the count calculated in the previous reduction, calculate the average value, and return the final result TaxEntryAggregation (lines 9, 10, 11).

       

Keywords: Java Back-end

Added by ndorfnz on Wed, 02 Mar 2022 14:08:00 +0200