Mastering Stream Processing - Viewing and analyzing results
by Bill Bejeck
This is the sixth blog in a series on windowing in event stream processing. Here’s a list of the previous posts:
In this post, we’ll move on from covering the specific window implementations and discuss the viewing and analysis techniques for windowed results.
Flink SQL results
Given that Flink SQL renders results in a table, the ability to view the details of windowed operations is straightforward. Flink SQL creates 3 columns for windowing-TVF queries: window_start
, window_end
, and window_time
(not shown here for brevity). Flink SQL calculates the window_time
field by subtracting 1 ms from the window_end
value. So, results from a query with the window columns will look similar to the following:
Table results of the query
+------------------+-------+------+------------------+------------------+
| bidtime | price | item | window_start | window_end |
+------------------+-------+------+------------------+------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 |
+------------------+-------+------+------------------+------------------+
So, the results plainly show the time for each event tracked by the window. But directly running a windowing query and displaying the results to the console has one drawback: it needs to be shared, and other persons interested in the results would need to craft and run their queries. While running queries from a console is suitable for prototyping and testing different SQL statements, it doesn’t lend itself to organization-wide distribution of results. For that, a better approach would be to store the results of a windowed query in another table whose schema and existence can be circulated within an organization.
For example, consider a query that will generate an alert when an average reading exceeds a given threshold. Going back to the blog post on [LINK] OVER aggregations, you first generated a query to perform an aggregation per row:
SQL OVER aggregation with aggregated results per row
SELECT location, device_id, report_time,
AVG(temp_reading) OVER (
PARTITION BY location
ORDER BY report_time
RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
) AS one_minute_location_temp_averages
FROM readings;
This query does all the work of generating an average over a sliding range of 1 minute from the readings table. We have some work to do to make it capture the alerting state we’re interested in. At this point, we have two possible approaches: creating a new table with the aggregations or creating a new table with only the alert data.
Let’s create a table with all the aggregations generated by this query:
Create a Table with all aggregations
CREATE TABLE reading_averages (location STRING,
device_id STRING,
report_time TIMESTAMP(3),
reading_averages DOUBLE);
Now you’ll write a persistent query that will continually update this table with the results of our OVER aggregation:
Populating a table with OVER aggregation resulst
INSERT INTO reading_averages
SELECT location, device_id, report_time,
AVG(temp_reading) OVER (
PARTITION BY location
ORDER BY report_time
RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
) AS one_minute_location_temp_averages
FROM readings;
Here, we’ve wrapped our OVER aggregation with an INSERT statement to push the results into our table for later analysis. To take this further, imagine we want a table containing only our alerted state. We can still use our table schema, but make a slight change to the column names:
Create a Table with only alert aggregations
CREATE TABLE reading_alerts (location STRING,
device_id STRING,
report_time TIMESTAMP(3),
reading_alerts DOUBLE);
Now you’ll follow a similar approach but with 2 nested queries. The first to perform the OVER aggregation and the second to pull out the records meeting the altering threshold:
Extracting only the alert results
SELECT location, device_id, report_time, avg_temps
FROM (
SELECT location, device_id, report_time, Avg(reading)
OVER (ORDER BY report_time
RANGE BETWEEN INTERVAL '15' MINUTE PRECEDING AND CURRENT ROW
) AS avg_temps
FROM readings
)
WHERE avg_temps > SOME_VALUE;
At this point we’re almost there – we now need to get this data into a table. We’ll take the same approach as before, we’ll wrap this SELECT
statement with an INSERT
to push the results to a table:
Pushing only alert results to a table
INSERT INTO reading_alerts
SELECT location, device_id, report_time, avg_temps
FROM (
SELECT location, device_id, report_time, Avg(reading)
OVER (ORDER BY report_time
RANGE BETWEEN INTERVAL '15' MINUTE PRECEDING AND CURRENT ROW
) AS avg_temps
FROM readings
)
WHERE avg_temps > SOME_VALUE;
Now we have a table containing only the alert results in a table that anyone can access.
Kafka Streams results
With Kafka Streams, you have a couple of choices when it comes to evaluating the windowed results. Let’s revisit the sliding window example:
Sliding windows in Kafka Streams
Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class,
60_000L
);
KStream<String,Double> iotHeatSensorStream =
builder.stream("heat-sensor-input",
Consumed.with(stringSerde, doubleSerde));
iotHeatSensorStream.groupByKey()
.windowedBy(
SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1))
)
.aggregate(() -> new IotSensorAggregation(tempThreshold),
aggregator,
Materialized.with(stringSerde, aggregationSerde))
.toStream().to("sensor-agg-output",
Produced.with(windowedSerde, aggregationSerde))
As it stands here, this will produce results to Apache Kafka®. To analyze the windowed results, i.e., consume from the output topic, you would need to use the Serde<Windowed<String>>
class to get the deserializer for the key, which means you’re leaking specific details of the streaming application. Additionally, I find it challenging to have the window start and end separated in the key vs. the value. Kafka Streams needs to store the window in the key during processing to ensure it’s appropriately handled as time advances. Still, once it emits a windowed aggregation, we don’t need to maintain it in the key.
Instead, I’d propose mapping a new value that contains the start and end times. Taking this a step further, I suggest adding two long fields to your aggregation so that adding the window times is simple. From there, you’ll update the topology to use a map operation to extract the window information and place it in the aggregation value. But before we do that, let’s create a KeyValueMapper that will know how to extract the window start and end:
KeyValueMapper to get window start and end
public class WindowTimeToAggregateMapper implements KeyValueMapper<Windowed<String>,
IotSensorAggregation,
KeyValue<String, IotSensorAggregation>> {
@Override
public KeyValue<String, IotSensorAggregation> apply(Windowed<String> windowed,
IotSensorAggregation iotSensorAggregation) {
long start = windowed.window().start();
long end = windowed.window().end();
iotSensorAggregation.setWindowStart(start);
iotSensorAggregation.setWindowEnd(end);
return KeyValue.pair(windowed.key(), iotSensorAggregation);
}
}
-
Extracting the window start and end times
-
Setting the window start and end time on the aggregation object
Since the KeyValueMapper
is a single abstract method (SAM) method we could define it inline as a lambda in the Kafka Streams topology, but it’s useful to create a concrete instance for testing. Now you need to plug this into the Kafka Streams DSL:
Adding the KeyValue mapper into the topology
KStream<String,Double> iotHeatSensorStream =
builder.stream("heat-sensor-input",
Consumed.with(stringSerde, doubleSerde));
iotHeatSensorStream.groupByKey()
.windowedBy(
SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1))
)
.aggregate(() -> new IotSensorAggregation(tempThreshold),
aggregator,
Materialized.with(stringSerde, aggregationSerde))
.toStream()
.map(new WindowTimeToAggregateMapper())
.to("sensor-agg-output",
Produced.with(stringSerde, aggregationSerde));
- Applying the
KeyValueMapper
to extract the window starting and closing time.
Now, with the addition of the KStream.map
operator with the new KeyValueMapper
, you’ve updated your aggregation to include the start and end of the window. Since you’ve also pulled the underlying key out, you’ll switch the Serde for Produced
to reflect the change in types. When you analyze the aggregation result, you’ll have direct access to the window starting and ending times.
Kafka Streams also allows you to directly observe the results of the aggregation from its state store via Interactive Queries. I won’t go into those details here, but you can view a presentation on building an Interactive Query service from the 2022 Kafka Summit and take a look at the accompanying source code.
Resouces
Subscribe via RSS