This is the sixth blog in a series on windowing in event stream processing. Here’s a list of the previous posts:

  1. Introduction to windowing

  2. Hopping and Tumbling windows

  3. Sliding windows and OVER aggregation

  4. Session windows Cumulating windows

  5. Window time semantics

In this post, we’ll move on from covering the specific window implementations and discuss the viewing and analysis techniques for windowed results.

Flink SQL results

Given that Flink SQL renders results in a table, the ability to view the details of windowed operations is straightforward. Flink SQL creates 3 columns for windowing-TVF queries: window_start, window_end, and window_time (not shown here for brevity). Flink SQL calculates the window_time field by subtracting 1 ms from the window_end value. So, results from a query with the window columns will look similar to the following:

Table results of the query

+------------------+-------+------+------------------+------------------+
|          bidtime | price | item |     window_start |       window_end |
+------------------+-------+------+------------------+------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 |
+------------------+-------+------+------------------+------------------+

So, the results plainly show the time for each event tracked by the window. But directly running a windowing query and displaying the results to the console has one drawback: it needs to be shared, and other persons interested in the results would need to craft and run their queries. While running queries from a console is suitable for prototyping and testing different SQL statements, it doesn’t lend itself to organization-wide distribution of results. For that, a better approach would be to store the results of a windowed query in another table whose schema and existence can be circulated within an organization.

For example, consider a query that will generate an alert when an average reading exceeds a given threshold. Going back to the blog post on [LINK] OVER aggregations, you first generated a query to perform an aggregation per row:

SQL OVER aggregation with aggregated results per row


    SELECT location, device_id, report_time,
       AVG(temp_reading) OVER (
         PARTITION BY location
         ORDER BY report_time
          RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
     ) AS one_minute_location_temp_averages
    FROM readings;

This query does all the work of generating an average over a sliding range of 1 minute from the readings table. We have some work to do to make it capture the alerting state we’re interested in. At this point, we have two possible approaches: creating a new table with the aggregations or creating a new table with only the alert data.

Let’s create a table with all the aggregations generated by this query:

Create a Table with all aggregations

    CREATE TABLE reading_averages (location STRING,
                                   device_id STRING,
                                   report_time TIMESTAMP(3),
                                   reading_averages DOUBLE);

Now you’ll write a persistent query that will continually update this table with the results of our OVER aggregation:

Populating a table with OVER aggregation resulst

    INSERT INTO reading_averages
      SELECT location, device_id, report_time,
       AVG(temp_reading) OVER (
         PARTITION BY location
         ORDER BY report_time
          RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
         ) AS one_minute_location_temp_averages
      FROM readings;

Here, we’ve wrapped our OVER aggregation with an INSERT statement to push the results into our table for later analysis. To take this further, imagine we want a table containing only our alerted state. We can still use our table schema, but make a slight change to the column names:

Create a Table with only alert aggregations

    CREATE TABLE reading_alerts (location STRING,
                                 device_id STRING,
                                 report_time TIMESTAMP(3),
                                 reading_alerts DOUBLE);

Now you’ll follow a similar approach but with 2 nested queries. The first to perform the OVER aggregation and the second to pull out the records meeting the altering threshold:

Extracting only the alert results

    SELECT location, device_id, report_time, avg_temps
     FROM (
            SELECT location, device_id, report_time, Avg(reading)
                 OVER (ORDER BY report_time
                        RANGE BETWEEN INTERVAL '15' MINUTE PRECEDING AND CURRENT ROW
                     ) AS avg_temps
            FROM readings
    )
    WHERE avg_temps > SOME_VALUE;

At this point we’re almost there – we now need to get this data into a table. We’ll take the same approach as before, we’ll wrap this SELECT statement with an INSERT to push the results to a table:

Pushing only alert results to a table

    INSERT INTO reading_alerts
        SELECT location, device_id, report_time, avg_temps
           FROM (
                  SELECT location, device_id, report_time, Avg(reading)
                     OVER (ORDER BY report_time
                           RANGE BETWEEN INTERVAL '15' MINUTE PRECEDING AND CURRENT ROW
                      ) AS avg_temps
              FROM readings
            )
           WHERE avg_temps > SOME_VALUE;

Now we have a table containing only the alert results in a table that anyone can access.

Kafka Streams results

With Kafka Streams, you have a couple of choices when it comes to evaluating the windowed results. Let’s revisit the sliding window example:

Sliding windows in Kafka Streams

    Serde<Windowed<String>> windowedSerde =
            WindowedSerdes.timeWindowedSerdeFrom(String.class,
                                                  60_000L
                                                );

    KStream<String,Double> iotHeatSensorStream =
      builder.stream("heat-sensor-input",
        Consumed.with(stringSerde, doubleSerde));

    iotHeatSensorStream.groupByKey()
          .windowedBy(
                      SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1)) 
                      )
            .aggregate(() -> new IotSensorAggregation(tempThreshold),
             aggregator,
             Materialized.with(stringSerde, aggregationSerde))
             .toStream().to("sensor-agg-output",
               Produced.with(windowedSerde, aggregationSerde))

As it stands here, this will produce results to Apache Kafka®. To analyze the windowed results, i.e., consume from the output topic, you would need to use the Serde<Windowed<String>> class to get the deserializer for the key, which means you’re leaking specific details of the streaming application. Additionally, I find it challenging to have the window start and end separated in the key vs. the value. Kafka Streams needs to store the window in the key during processing to ensure it’s appropriately handled as time advances. Still, once it emits a windowed aggregation, we don’t need to maintain it in the key.

Instead, I’d propose mapping a new value that contains the start and end times. Taking this a step further, I suggest adding two long fields to your aggregation so that adding the window times is simple. From there, you’ll update the topology to use a map operation to extract the window information and place it in the aggregation value. But before we do that, let’s create a KeyValueMapper that will know how to extract the window start and end:

KeyValueMapper to get window start and end

    public class WindowTimeToAggregateMapper implements KeyValueMapper<Windowed<String>,
                                                                      IotSensorAggregation,
                                                                      KeyValue<String, IotSensorAggregation>> {
        @Override
        public KeyValue<String, IotSensorAggregation> apply(Windowed<String> windowed,
                                                            IotSensorAggregation iotSensorAggregation) {
            long start = windowed.window().start(); 
            long end = windowed.window().end();

            iotSensorAggregation.setWindowStart(start); 
            iotSensorAggregation.setWindowEnd(end);

            return KeyValue.pair(windowed.key(), iotSensorAggregation);
        }
    }
  1. Extracting the window start and end times

  2. Setting the window start and end time on the aggregation object

Since the KeyValueMapper is a single abstract method (SAM) method we could define it inline as a lambda in the Kafka Streams topology, but it’s useful to create a concrete instance for testing. Now you need to plug this into the Kafka Streams DSL:

Adding the KeyValue mapper into the topology

    KStream<String,Double> iotHeatSensorStream =
      builder.stream("heat-sensor-input",
        Consumed.with(stringSerde, doubleSerde));

    iotHeatSensorStream.groupByKey()
          .windowedBy(
                      SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1))
                      )
            .aggregate(() -> new IotSensorAggregation(tempThreshold),
             aggregator,
             Materialized.with(stringSerde, aggregationSerde))
             .toStream()
             .map(new WindowTimeToAggregateMapper()) 
             .to("sensor-agg-output",
               Produced.with(stringSerde, aggregationSerde));
  1. Applying the KeyValueMapper to extract the window starting and closing time.

Now, with the addition of the KStream.map operator with the new KeyValueMapper, you’ve updated your aggregation to include the start and end of the window. Since you’ve also pulled the underlying key out, you’ll switch the Serde for Produced to reflect the change in types. When you analyze the aggregation result, you’ll have direct access to the window starting and ending times.

Kafka Streams also allows you to directly observe the results of the aggregation from its state store via Interactive Queries. I won’t go into those details here, but you can view a presentation on building an Interactive Query service from the 2022 Kafka Summit and take a look at the accompanying source code.

Resouces