Mastering Stream Processing - Hoppping and Tumbling windows
by Bill Bejeck
In the first post of this series, we discussed what event streaming windowing is, and we examined in detail the structure of a windowed aggregate in Kafka Streams and Flink SQL.
In this post, we’ll dive into two specific windowing implementations: hopping and tumbling windows.
Hopping windows
A hopping window has a fixed time length, and it moves forward or "hops" at a time interval smaller than the window’s length. For example, a hopping window can be one minute long and advance every ten seconds. The following illustration demonstrates the concept of a hopping window:
So, from the illustration above, hopping windows can produce overlapping results. A hop forward can include results contained in the previous window. Let’s look at another illustration demonstrating this concept:
Walking through the picture:
-
Window one starts at 12:00:00 PM and will collect data until 12:01:00 PM (end time is exclusive).
-
At 12:00:30 PM, due to the thirty-second advance, window two starts gathering data.
-
Window one and window two will share data for thirty seconds from the start of window two until the end of window one. The process continues with each window advance.
Let’s show how you would implement a hopping window in Kafka Streams and Flink SQL.
Kafka Streams hopping window
For a hopping windowed aggregation in Kafka Streams, you’ll use one of the factory methods in the TimeWindows class:
A Kafks Streams hopping window example
KStream<String,Double> iotHeatSensorStream =
builder.stream("heat-sensor-input",
Consumed.with(stringSerde, doubleSerde));
iotHeatSensorStream.groupByKey()
.windowedBy(
TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))
.advanceBy(Duration.ofSeconds(30))
)
.aggregate(() -> new IotSensorAggregation(tempThreshold),
aggregator,
Materialized.with(stringSerde, aggregationSerde))
.toStream().to("sensor-agg-output",
Produced.with(windowedSerde, aggregationSerde))
-
By using
TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))
sets the window size at one minute, thewithNoGrace
means Kafka Streams will drop any out-of-order records that would have been included in the window had they arrived in order. We’ll get into grace periods more in the blog post on windowing time semantics. -
The
.advanceBy(Duration.ofSeconds(30)
call makes this a hopping window. It creates a window that is one minute in size and advances every ten seconds.
Next, let’s move on to hopping windows with Flink SQL.
Flink SQL hopping window
Note that Flink hopping windows can also be referred to as sliding windows. Kafka Stream offers a sliding window variant that behaves differently from its hopping window offering. So, for clarity, we’ll only refer to Flink windows with an advance smaller than the window size as hopping windows.
Hopping window average with Flink SQL
SELECT window_start,
window_end,
device_id,
AVG(reading) AS avg_reading
FROM TABLE(HOP
(TABLE device_readings,
DESCRIPTOR(ts),
INTERVAL '30' SECONDS,
INTERVAL '1' MINUTES
))
GROUP BY window_start,
window_end,
device_id
-
Specifying hopping windows by passing the
HOP
function to theTABLE
function. -
The table you’ll use as the source for the hopping window aggregation.
-
The
DESCRIPTOR
is the column with the time attribute used for the window. -
This first
INTERVAL
is the amount of "hop" or advance of the window. -
The second
INTERVAL
is the size of the window.
Now, let’s move on to tumbling windows.
Tumbling windows
A tumbling window has a fixed length in size and has an advance that is the same amount of time as the size. Tumbling windows are considered a specialized case of hopping windows due to the advance equalling the window size.
Since a tumbling window starts a new one when the previous one ends, they don’t share any data. You won’t find records from one window in another one; the following illustration helps clarify this process:
Stepping through this illustration
-
Window one starts at 12:00:00 PM and will collect data until it ends. The endtime of the window is exclusive
-
At 12:01:00 PM window two starts collecting data
-
Since each window starts collecting data after the previous window ended, there are no shared results.
Kafka Streams tumbling window
For tumbling windows in Kafka Streams you’ll use TimeWindows class:
A Kafks Streams tumbling window example
KStream<String,Double> iotHeatSensorStream =
builder.stream("heat-sensor-input",
Consumed.with(stringSerde, doubleSerde));
iotHeatSensorStream.groupByKey()
.windowedBy(
TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))
)
.aggregate(() -> new IotSensorAggregation(tempThreshold),
aggregator,
Materialized.with(stringSerde, aggregationSerde))
.toStream().to("sensor-agg-output",
Produced.with(windowedSerde, aggregationSerde))
- Using
TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))
without the.advanceBy
clause automatically makes this a tumbling window. Since you didn’t specify an advance, Kafka Streams will add one equal to the size. You could add theadvanceBy
clause with the same amount of time if you choose to skip the shortened version.
Flink SQL tumbling window
Tumbling windows in Flink SQL are defined similarly to the hopping variety, with a couple of differences
Tumbling window average with Flink SQL
SELECT window_start,
window_end,
device_id,
AVG(reading) AS avg_reading
FROM TABLE(TUMBLE
(TABLE device_readings,
DESCRIPTOR(timestamp),
INTERVAL '1' MINUTES
))
GROUP BY window_start,
window_end,
device_id
-
Specifying tumbling windows by passing the
TUMBLE
function to theTABLE
function. -
The table you’ll use as the source for the tumbling window aggregation.
-
The
DESCRIPTOR
is the column with the time attribute used for the window. -
By passing a single
INTERVAL
parameter, Flink SQL will utilize this as the advance and the size.
You define tumbling windows in Flink SQL similarly to Kafka Streams in that you only provide one time paramater.
Use cases
Hopping Windows
Let’s look at an illustration for the use case of hopping windows:
So, from looking at this image, we could generalize hopping windows as "every <time-period of window advance> give me <aggregate> over the last <window size> period". From our examples here, it would be "every 30 seconds, give me the average temp reading over the last minute". So, any problem domain where you want to closely monitor changes over time or compare changes relative to the previous reading could be a good fit for the hopping window.
It’s worth noting that Kafka Streams will emit periodic results before a window closes, while Flink SQL will only emit results when it’s closed. I’ll go into more details about this in the fourth installment of this blog series.
Tumbling Windows
For the tumbling window, we have another illustration:
This illustration for tumbling windows can be summarized as "give me <aggregate> every <window size period>" or restated to fit the examples in this post: "give me the average temp reading over the last minute". Since tumbling windows don’t share any records, any situation requiring a unique count of events per window period would be a reason to use a tumbling window.
Resouces
Subscribe via RSS