Building Business Dashboards with Micronaut and Elasticsearch Aggregations Framework – Part 1

We all know and love Elasticsearch great full-text query capabilities. But Elasticsearch offers much more great functionalities and aggregations framework is one of them. In this twopart post we will walk through creation of fully usable business dashboard build with help of Micronaut framework, Elasticsearch and Chart.js. 

We are going to add a new microservice to our LAB Insurance Sales Portal sample microservice based solution. This microservice called Dashboard Service will be responsible for gathering data about sold policies and providing statistical data for dashboard. 

micronaut labbox

Policy Service will publish events to Kafka every time a new policy is sold. Dashboard Service will subscribe for these events and will index sold policy data into Elasticsearch instance. 

Dashboard Service will expose REST API that will allow frontend to query for statistical data like: 

  • total sales and number of policies sold per time period,
  • sales amounts and number of policies sold for given insurance products,
  • sales results for agents or sales result presented on timeline for given period.

The result is presented on the picture below, where you can see frontend app written using Vue.js that display all this statistical data and allows user to apply different filters. 

micronaut statistics

At the top you can see filtering options: you can filter by insurance product and by time period. Below in one row you can see total sales and total number of policies sold followed by sales results for each product. Next rows contain a line chart showing sales results for given time period, bar chart showing sales for each agent and pie chart showing how sales is divided into products.

First part will focus on gathering the data in Elasticsearch indexes and using aggregations framework to build API that will allow us to query statistical data. Second part will present how to consume this API and visualize it with Chart.js.

Source code for our solution is available on GitHub: https://github.com/asc-lab/micronaut-microservices-poc. 

Let’s see what we need to do to implement such solution.

Getting and storing the data

In the first step we must gather that data on which Elasticsearch aggregations framework will operate. Apart from adding necessary dependencies in pom.xml we must configure Kafka support in application.yml

---
kafka:
  bootstrap:
    servers: localhost:9092

Now, with help of Micronaut declarative Kafka support creation of a listener is just a matter of adding two annotations to a class.

@KafkaListener(clientId = "policy-registered-dashboard-listener", offsetReset = OffsetReset.EARLIEST)
@RequiredArgsConstructor
public class PolicyRegisteredListener {

    private final PolicyRepository policyRepository;

    @Topic("policy-registered")
    void onPolicyRegistered(PolicyRegisteredEvent event) {
        policyRepository.save(new PolicyDocument(
                event.getPolicy().getNumber(),
                event.getPolicy().getFrom(),
                event.getPolicy().getTo(),
                event.getPolicy().getPolicyHolder(),
                event.getPolicy().getProductCode(),
                event.getPolicy().getTotalPremium(),
                event.getPolicy().getAgentLogin()
        ));
    }
}

Our listener extracts data from message and saves it in Elasticsearch index. PolicyDocument is simple POJO class with properties representing sold policy: number, validity period, policy owner name and surname, insurance product code, total premium amount (price) and login of an agent who sold given policy.

We need to create a class responsible for saving and loading data from Elasticsearch. This class is PolicyElasticRepository and it uses RestHighLevelClient to communicate with Elasticsearch. 

    [...]
    private final RestHighLevelClient esClient;
    private final JsonConverter jsonConverter;

    public void save(PolicyDocument policyDocument) {
        IndexRequest indexRequest = new IndexRequest("policy_stats")
                .type("policy_type")
                .id(policyDocument.getNumber())
                .setRefreshPolicy("true")
                .source(jsonConverter.stringifyObject(policyDocument), XContentType.JSON);

        try {
            esClient.index(indexRequest);
        } catch (IOException e) {
            throw new RuntimeException("Error while executing query", e);
        }
    }

It is important to properly setup Jackson JSON serialization as data in Elasticsearch is stored as JSON. In order to support java.time classes we must register JavaTimeModule. One way to do this is by implementing BeanCreatedEventListener and customizing an instance of ObjectMapper already created by the framework. 

@Singleton
public class ObjectMapperBeanEventListener implements BeanCreatedEventListener {
    @Override
    public ObjectMapper onCreated(BeanCreatedEvent event) {
        final ObjectMapper mapper = event.getBean();
        mapper.registerModule(new JavaTimeModule());
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        return mapper;
    }
}

Complete source code for implementation of repository using Elasticsearch can be found here 

Getting total sales and policies count for each product using Elasticsearch aggregations

Now when sales data are stored in an index, we can start implementing query functionality that will get statistics. Elasticsearch aggregations framework offers several kinds of aggregations, out of which we are going to use bucketing. Other are metrics, matrix and pipeline. You can learn more from official documentation. 

With bucketing documents are grouped into buckets and we can calculate various values per bucket. Bucket aggregations also support nesting. This means that we can run nested aggregation over data in each bucket. That is exactly what we need for our query that is supposed to give us total number of policies sold and then calculate sum of total premiums on policies in a bucket, and we need this information for each insurance product.

The code that builds such query is presented below (full class is available here):

        SearchRequest searchRequest = new SearchRequest("policy_stats")
                .types("policy_type");

        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
        if (query.getFilterByProductCode()!=null) {
            filterBuilder.must(QueryBuilders.termQuery("productCode.keyword", query.getFilterByProductCode()));
        }
        if (query.getFilterBySalesDate()!=null){
            RangeQueryBuilder datesRange = QueryBuilders
                    .rangeQuery("from")
                    .gte(query.getFilterBySalesDate().getFrom().toString())
                    .lt(query.getFilterBySalesDate().getTo().toString());
            filterBuilder.must(datesRange);
        }
        AggregationBuilder aggBuilder = AggregationBuilders.filter("agg_filter",filterBuilder);

        TermsAggregationBuilder sumAggBuilder = AggregationBuilders
                .terms("count_by_product")
                .field("productCode.keyword")
                .subAggregation(AggregationBuilders.sum("total_premium").field("totalPremium"));
        aggBuilder.subAggregation(sumAggBuilder);

        SearchSourceBuilder srcBuilder = new SearchSourceBuilder()
                .aggregation(aggBuilder)
                .size(0);
        searchRequest.source(srcBuilder);

        return searchRequest;

We create an instance of SearchRequest, then we use BoolQueryBuilder to build a filter that will allow us to perform aggregations only on documents matching specific product and period in which it was sold (we treat policy start date as the date on which it was sold). Finally, we use create TermsAggregation that will count number of policies for given productCode. This is like using GROUP BY SQL clause.  

With policies grouped by productCode we create sub-aggregation that will calculate sum of total premiums for each group of policies.  

We connect all the pieces together and set result size to 0 because we want only aggregation results not the actual documents.

We can execute this query using RestHighLevelClient search method. Now we need to extract results: 

        TotalSalesQuery.Result.ResultBuilder result = TotalSalesQuery.Result.builder();
        long count = 0;
        BigDecimal amount = BigDecimal.ZERO;
        Filter filterAgg = searchResponse.getAggregations().get("agg_filter");
        Terms products = filterAgg.getAggregations().get("count_by_product");
        for (Terms.Bucket b : products.getBuckets()){
            count += b.getDocCount();
            Sum sum = b.getAggregations().get("total_premium");
            amount = amount.add(BigDecimal.valueOf(sum.getValue()).setScale(2,BigDecimal.ROUND_HALF_UP));
            result.productTotal(b.getKeyAsString(), SalesResult.of(b.getDocCount(),BigDecimal.valueOf(sum.getValue())));
        }
        result.total(SalesResult.of(count,amount));

We need to get our filtered aggregations by name (agg_filter), then we can get our terms aggregation count_by_products. This will allow us to fetch buckets with results and we can get product code (getKeyAsString on bucket) and number of policies for that product (getDocCount). Here we can also access sub-aggregations list, where our “total_premium” sum aggregation should be found. Now we can collect results for each product and use it to have totals for all products. 

The whole thing is nicely packaged into PolicyRepository, TotalSalesQueryAdapter and GetTotalSalesQueryHandler, and made available for the outside world via controller. 

@RequiredArgsConstructor
@Validated
@Controller("/dashboard")
public class DashboardController implements DashboardOperations {

    private final CommandBus bus;

    @Override
    public GetTotalSalesQueryResult queryTotalSales(GetTotalSalesQuery query) {
        return bus.executeQuery(query);
    }

    [...]
}

Getting total sales and policies count for each agent using Elasticsearch aggregations

Next query extract sales results for agents. For given time period and product we need to get total sum of total premium on policies that given agent sold.

Process of building such a query and extraction of results is almost the same as in case of previous query. As you can see in AgentSalesQueryAdapter class the only difference is that we use agentLogin.keyword field as key and we do not need to calculate totals while extracting results. 

        TermsAggregationBuilder sumAggBuilder = AggregationBuilders
                .terms("count_by_agent")
                .field("agentLogin.keyword")
                .subAggregation(AggregationBuilders.sum("total_premium").field("totalPremium"));
        aggBuilder.subAggregation(sumAggBuilder);

Query is handled by GetAgentsSalesQueryHandler and added to DashboardController so it can be used from client application. 

Getting sales trends using Elasticsearch aggregations

Last query we are going to analyze is supposed to show us sales trends by returning total sales calculated for each unit of time in given time period.  We are going to use new aggregation type here – Date Histogram Aggregation. 

        SearchRequest searchRequest = new SearchRequest("policy_stats")
                .types("policy_type");

        BoolQueryBuilder filterBuilder = QueryBuilders.boolQuery();
        if (query.getFilterByProductCode()!=null) {
            filterBuilder.must(QueryBuilders.termQuery("productCode.keyword", query.getFilterByProductCode()));
        }
        if (query.getFilterBySalesDate()!=null){
            RangeQueryBuilder datesRange = QueryBuilders
                    .rangeQuery("from")
                    .gte(query.getFilterBySalesDate().getFrom().toString())
                    .lt(query.getFilterBySalesDate().getTo().toString());
            filterBuilder.must(datesRange);
        }
        AggregationBuilder aggBuilder = AggregationBuilders.filter("agg_filter",filterBuilder);

        DateHistogramAggregationBuilder histBuilder = AggregationBuilders
                .dateHistogram("sales")
                .field("from")
                .dateHistogramInterval(query.getAggregationUnit().toDateHistogramInterval())
                .subAggregation(AggregationBuilders.sum("total_premium").field("totalPremium"));
        aggBuilder.subAggregation(histBuilder);

        SearchSourceBuilder srcBuilder = new SearchSourceBuilder()
                .aggregation(aggBuilder)
                .size(0);
        searchRequest.source(srcBuilder);

        return searchRequest;

As in previous query we start by building a filter that will limit the scope for our aggregation only to policies sold within given time period for given insurance product. Then we build Date Histogram Aggregation where we specify a field which will be used for grouping and time unit over which we aggregate. In our case this can be a day, a week, a month or a year, but Elasticsearch supports more time units (minutes, seconds). Now our filtered policies will be grouped by a value of from field into units of time specified. For example, if we specify TimeAggregationUnit=MONTH, all policies with from data in given month will be grouped together. Last thing is adding sub-aggregation to sum premium amounts for each group.

Results extraction is like previous ones, but we deal here with different type of bucket.

        SalesTrendsQuery.Result.ResultBuilder result = SalesTrendsQuery.Result.builder();

        Filter filterAgg = searchResponse.getAggregations().get("agg_filter");
        Histogram agg = filterAgg.getAggregations().get("sales");
        for (Histogram.Bucket b : agg.getBuckets()){
            DateTime key = (DateTime)b.getKey();
            Sum sum = b.getAggregations().get("total_premium");
            result.periodSale(
                    new SalesTrendsQuery.PeriodSales(
                            LocalDate.of(key.getYear(),key.getMonthOfYear(),key.getDayOfMonth()),
                            b.getKeyAsString(),
                            SalesResult.of(b.getDocCount(), BigDecimal.valueOf(sum.getValue()).setScale(2, BigDecimal.ROUND_HALF_UP))
                    )
            );
        }

        return result.build();

Here we get our “sales” aggregation and iterate over histogram buckets. Each bucket represents group of policies sold within given time period. We extract a key for each bucket (period). The key here is a date representing first day of a period associated with a bucket. For example, if we were searching for sales divided into months, the it is a first day of given month, if we were searching for sales divided into weeks then it will be first day of a given week. We also get total number of policies sold in given time by extracting documents count within the bucket. We also access sub-aggregations list to het our sum aggregation that gives us total amount of premiums for given period.

Testing

Most of the code in our solution deals with building queries, execution it and extracting the results. This means that testing this code without interaction with actual data stored in Elasticsearch does not make much sense, and we need to talk to “real” instance of Elasticsearch in out tests. There are many ways to do this. For example, we can use Test Containers Elasticsearch module. 

Another option, which does not involve Docker, is use of embedded Elasticsearch in tests. We must add the following dependency to pom.xml 

    <dependency>
      <groupId>pl.allegro.tech</groupId>
      <artifactId>embedded-elasticsearch</artifactId>
      <version>2.10.0</version>
      <scope>test</scope>
    </dependency>

Then we can start instance of Elasticsearch from within our test. Below is a little helper class that configures and starts an instance of Elasticsearch for our tests.

public class DashboardEmbeddedElastic {
    private static EmbeddedElastic embeddedElastic = null;

    static EmbeddedElastic getInstance() {
        if (embeddedElastic == null) {
            try {
                embeddedElastic = createAndRun();
            } catch (IOException | InterruptedException e) {
                throw new RuntimeException("Cannot start embedded Elastic", e);
            }
        }
        return embeddedElastic;
    }

    private static EmbeddedElastic createAndRun() throws IOException, InterruptedException {
        return EmbeddedElastic.builder()
                .withElasticVersion("6.6.2")
                .withSetting(PopularProperties.TRANSPORT_TCP_PORT, 9350)
                .withSetting(PopularProperties.HTTP_PORT, 9351)
                .withSetting(PopularProperties.CLUSTER_NAME, "my_cluster")
                .build()
                .start();

    }
}

In our tests we can now create an instance of Elasticsearch and use it.

static EmbeddedElastic el = DashboardEmbeddedElastic.getInstance();

We point high level elastic client to that instance and initialize our repository classes. You can see the whole test for one of our queries here. 

Summary

In the first part of this post we presented how you can use Elasticsearch in Micronaut based microservice to build a dashboard that presents sales information. We can build additional functionalities on the top of it. For example, we can consider structure of our organization and for agents who are supervisors we can present their teams results, while individual agents can see only their results.

Aggregations framework is a vast topic and such a short introduction will only help you getting started. If you are interested and want to learn more than Elasticsearch official documentation.

Below you can find links to 6.3 version documentation of Java API, which was used to implement this sample application (note that current version in 7.4):

Part two will present how you can use Chart.js and Vue.js with our Dashboard Service API to visualize statistical data.

Stay tuned!