Next, let's look at Cloud dataflow windowing capabilities. This is really data flow strength when it comes to streaming. Data flow gives us three different type of Windows. Fixed, sliding, and sessions. The first kind of fixed windows. Fixed windows are those that are divided into time slices. For example, hourly, daily, monthly. Fixed time windows consist of consistent non-overlapping intervals. Sliding window are those that you use for computing. For example, give me five minutes worth of data and compute that every 30 minutes. Sliding time windows can overlap, for example, in a running average. Session windows are defined by a minimum gap duration, and the timing is triggered by another element. Session windows are defined by a minimum gap duration, and the timing is triggered by another element. Session windows are for situations where the communication is bursty. It might correspond to a web session. An example might be if a user comes in and uses four to five pages and leaves. You can capture that as a session window. Any key in your data can be used as a session key. It will have a timeout period, and it will flush the window at the end of that time out period. Here is how we can set these different types of Windows in Python. In fixed window example, we can use this function bean.WindowInto, and window.FixedWindows with argument 60 to get fixed windows starting every 60 seconds. In second example, with sliding time window we use window.SlidingWindows with argument 30 and 5. Here, the first argument refers to the length of the window, that is, 30 seconds. And the second argument refers to how often new windows open, that is, five seconds. Finally, we have the example of session windows. We use windows.Sessions with an argument of 10 multiplied by 60 to define a session windows with time out of ten minutes, that is, 600 seconds. How does windowing work? All things being equal, this is how windowing ought to work. If there was no latency, if we had in an ideal world, if everything was instantaneous, then these fixed time windows would just flush at the close of the window. At the very microsecond at which the one minute window ends, all the data is flushed out. This is only if there is no latency. But in the real world, latency happens. We have network delays, system backlogs, processing delays, pops of latency, etc. So when do we want to close the window? Should we wait a little bit longer than a minute? Maybe a few more seconds? As you can see here, on the slide, we are using one minute window, and we expect data to be available in the respective windows. Data flow is expected in window one. But in reality, it arrives in the duration of window two. Data two arrives a bit late outside window two. Data three looks okay. In this scenario, we need to consider the treatment for data one and data two. Our system should be able to find a difference between expected arrival time and actual arrival time. This is known as lag time. Lag time helps system in data mining how the late arriving data can be handled. Data flow can automatically keep track of this lag time with a technique known as watermark. So what data flow is going to do is continuously compute the watermark, which is how far behind are we? Data flow ordinarily is going to wait until the watermark it has computed has elapsed. So if it is running a system lag of three or four seconds, it is going to wait four seconds before it flushes the window. Because this is when it believes all of the data should have arrived for that particular time period. What then happens to lag data? Let's say it gets an event with a time stamp of 8 or 4, but now, it's 8 or 6. It is two minutes late, one minute after the close of the window. What does it do with that data? The answer is you get to choose that. The default is just to discard it. But you can also tell it to reprocess the window based on those late arrivals. Beam's default windowing configuration tries to data mine when all the data has arrived based on the type of data sources. And then, advances the watermark past the end of the window. This default configuration does not allow late data. We know that data may arrive late in the system, and we may need to consider it in the aggregations. It is the default behavior to trigger an aggregation at the watermark. Let us spend some time understanding it's working. Event time triggers operate on date time stamp associated with each element. The default figure is of this type. The after watermark trigger is the only event type trigger currently supported. Apache beam data mines when all the elements with a date time stamp that falls within the window have been processed. This is the watermark. The passing of watermark causes the aggregation step to be performed. And after the watermark has passed, the default event time trigger is activated. Its behavior is to omit the results of the aggregation one time and discard any data that arrives late. In Java pipelines, you can overwrite this behavior and do something with late data. In a Python pipeline, currently, the late data is discarded unconditionally. Processing time triggers operate on the time at which an element is processed at some point in the pipeline as data mined by a system clock. You could set an after processing time trigger on unbounded data contained in a global window. For example, emitting data every 30 seconds, the data never ends, the window never closes. But interim results are emitted every 30 seconds by the trigger. And a data-driven trigger is associated with the condition of data contained in the element itself. Currently, this simply counts each element that has been processed in the window. You could set after count to 15, and every 15 elements processed would cause an emit. Composite triggers combine effects. For example, consider if you had a data-driven after count trigger set to 15. Every 15 elements, it would emit. However, if there are 14 elements in the P-collection and no more data arrived, the 14 would sit in the window forever. In this case, you could add in an event time trigger to ensure that the last 14 were serviced by an emit. Now we know different techniques to handle accumulation late arrival data. We also know that triggers are used to initiate the accumulation, and watermarks help in deciding the lag time and related corrective actions for computing accumulations. On your screen, you can see a code which creates a sample trigger. As you can see in the code, we are creating a sliding window of 60 seconds, and it slides every five seconds. Function after watermark method gives us details about when to trigger the accumulation. The code uses two options. First, early or speculative figuring, which is set to 30 seconds second. Second, late for each late arriving late item. The second code segment demonstrates the composite trigger. The composite trigger will get activated either hundred elements are available for accumulation, or every 60 seconds irrespective of watermark. This code segment uses fixed window of one minute's duration. This is how the window reprocesses. This late processing that I'm describing only works in Java. Implementation of Apache beam watermark support is part of the open source software not directly implemented by Google. When you set a trigger, you need to choose either accumulate mode or discard mode. This example shows the different behaviors caused by the intersection of windowing, trigger, and accumulation mode. Can you see temperatures divided into ten minute windows using a data-driven trigger with after count 2? This means that after every two temperature readings, the window will get flushed. The user will only see two temperatures. But if I set to accumulate, I will see the original two plus the next two plus the next two. If you are doing element-wise processing, then it probably makes sense to discard. Because if you have seen it once, you have seen the data or used the data, and are done with it. However, if you are doing something like computing the moving average, you might want all the data to be refiled so that the latest results will be included in that average. One thing to remember. When it comes to memory utilization using data flow workers, if you set the accumulation mode to accumulate, and you set the lateness to, let's say, one hour, this means it needs to keep track of all the messages that arrived for the last hour in addition to the new messages.