What are Aggregations?
Data aggregation is any process in which information is gathered and expressed in a summary form, for purposes such as statistical analysis. A common aggregation purpose is to get more information about particular groups based on specific variables such as gender, age, profession, or income.
Aggregating
Requires Speedment HyperStream 1.2.0 or later.
A common use case in analytical applications is to aggregate many results into a few composite values.
This can be done very efficiently using the specialized collectors built into
Speedment HyperStream by leveraging the standard Java Stream
API.
Since the Aggregator is designed to perform all steps of the aggregation off-heap, aggregations of large data sets can be performed with minimal heap memory footprint.
The following examples demonstrate how data entities of the type Person
can be aggregated, defined by the following fields:
private static class Person {
private final int age;
private final short height;
private final short weight;
private final String gender;
private final double salary;
...
}
To represent results of aggregations AgeSalary
is used to
associates a certain age with an average salary.
private static class AgeSalary {
private int age;
private double avgSalary;
...
}
Aggregation to Explicitly Typed Results
To compute the average salary for each age, an Aggregator<Person, ?, AgeSalary>
is created as follows:
Aggregator<Person, ?, AgeSalary> aggregator = Aggregator.builder(AgeSalary::new)
.on(Person::age).key(AgeSalary::setAge)
.on(Person::salary).average(AgeSalary::setAvgSalary)
.build();
The first line calls the defines the Aggregator
to use and determines the constructor
for result objects as AgeSalary::new
. The second line declares the key for the aggregation;
first in terms of how to find the key value in an incoming Person
instance and then
how to set the key value in our result object. The third line is similar, but instead of
a key it defines an average value to be computed from the salaries of Person
instances.
An Aggregator
can produce a collector that can be used in any standard JavaStream
.
Thus, Stream<Person> persons
allows computation of the average salaries as follows:
Aggregation<AgeSalary> aggregation = persons().collect(aggregator.createCollector());
The Aggregation
holds the state of the aggregation data and allows repeated streaming over
the data.
aggregation.streamAndClose()
.forEach(System.out::println);
Since the Aggregation
may hold data that is stored off-heap, it may benefit from
explicit closing rather than just being garbage collected. Closing the Aggregation
can
be done by calling the close()
method, possibly by taking advantage of the AutoCloseable
trait, or as in the example above by using streamAndClose()
which returns a stream that
will close the Aggregation
after stream termination.
In summary, the aggregation can be condensed as follows.
persons().collect(Aggregator.builderOfType(Person.class, AgeSalary::new)
.on(Person::age).key(AgeSalary::setAge)
.on(Person::salary).average(AgeSalary::setAvgSalary)
.build()
.createCollector()
).streamAndClose()
.forEach(System.out::println);
Stream::iterator
or Stream::spliterator
, you cannot use streamAndClose()
because the auto-closing stream does not support these operations. Use try-resource in combination with stream()
instead.
Aggregation to Generic Tuples
Sometimes designing an explicit result data class is overly verbose without adding much
clarity. In such cases, Speedment MutableTuples
can be used to create result data on the fly.
persons().collect(
Aggregator.builder(MutableTuples.constructor(Integer.class, Double.class))
.on(Person::age).key(MutableTuple2::set0)
.on(Person::salary).average(MutableTuple2::set1)
.build()
.createCollector()
).streamAndClose()
.forEach(System.out::println);
Derived Keys and Values
The functions supplied to the aggregator for finding and setting keys and result field values are general functions, meaning that they do not necessarily need to be simple getters and setters as in the above examples. Using the Speedment predefined utilities for composing functions from basic building blocks, the example above can easily be extended to aggregate on decades instead of specific years. The key is then not the age, but age divided by 10 and that can be expressed as follows.
Aggregator.builder(MutableTuples.constructor(Integer.class, Double.class))
.on(divide(Person::age, 10).asInt()).key(MutableTuple2::set0)
.on(Person::salary).average(MutableTuple2::set1)
.build()
where the method divide
is statically imported from the Speedment utility class Expressions
. Clearly, one can
use any kind of function here, but using the Speedment utility functions allows the Speedment runtime to optimize
the stream operations and is therefore potentially significantly more efficient.
As a second example, consider the following code aggregating the BMI per gender of persons in a data set.
Aggregator<Person, ?, Result> aggregator = Aggregator.builder(Result::new)
.on(Person::getGender)
.key(Result::setGender)
.on(shortToDouble(Person::getWeight)
.divide(Expressions.pow(
Expressions.divide(Person::getHeight, 100),
2)))
.average(Result::setBMI)
.build();
Here, the Result
class is defined to have setter methods for BMI and gender, Result::setBMI
and Result::setGender
.
Aggregating DataStore Data
The actual aggregation computations are performed in off-heap memory, meaning that garbage collection is not affected and that the size of the aggregated data is not bounded by the size of the heap.
In the above examples, incoming data to aggregate is heap objects, meaning that no matter how the stream supplying the data creates it, all the incoming data objects will need to be garbage collected at some point. To address this, Speedment supports aggregating off-heap data in place in a DataStore, minimizing the need for heap materialization and the implied garbage collection load. This is achieved automatically if the Speedment aggregator is used to collect a stream from a DataStore.
Performance
The Aggregator will store intermediate results off-heap and can operate without creating any intermediate result objects during aggregation. If an Aggregator is used in conjunction with a Stream form a DataStore component, then the Aggregator could provide additional performance benefits such as only extracting needed column values without actually materializing the entities in the stream.
Questions and Discussion
If you have any question, don’t hesitate to reach out to the Speedment developers on Gitter.