A Coding Implementation to Build a Unified Apache Beam Pipeline Demonstrating Batch and Stream Processing with Event-Time Windowing Using DirectRunner
In this tutorial, we show how to build a unified unit Apache Beam Pipeline runs smoothly in both batch and streaming modes with DirectRunner. We generate event-time-aware synthetic data and apply fixed windows with triggers and permissible delays to demonstrate how Apache Beam consistently handles on-time and delayed events. By switching only the input source, we keep the underlying compilation logic identical, which helps us clearly understand how Beam’s event-time model, windows, and fragments behave without relying on external streaming infrastructure. verify Full codes here.
!pip -q install -U "grpcio>=1.71.2" "grpcio-status>=1.71.2"
!pip -q install -U apache-beam crcmod
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezone
We install the required dependencies and ensure version compatibility so Apache Beam can. We import the core Beam APIs as well as the windows, triggers, and TestStream utilities needed later down the pipeline. We also provide standard Python modules for time handling and JSON formatting. verify Full codes here.
MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120
def make_event(user_id, event_type, amount, event_time_epoch_s):
return {"user_id": user_id, "event_type": event_type, "amount": float(amount), "event_time": int(event_time_epoch_s)}
base = datetime.now(timezone.utc).replace(microsecond=0)
t0 = int(base.timestamp())
BATCH_EVENTS = [
make_event("u1", "purchase", 20, t0 + 5),
make_event("u1", "purchase", 15, t0 + 20),
make_event("u2", "purchase", 8, t0 + 35),
make_event("u1", "refund", -5, t0 + 62),
make_event("u2", "purchase", 12, t0 + 70),
make_event("u3", "purchase", 9, t0 + 75),
make_event("u2", "purchase", 3, t0 + 50),
]
We define the global configuration that controls window size, response time, and execution mode. We create synthetic events with clear event timestamps so that the behavior of the windows is deterministic and easy to reason about. We prepare a small dataset that intentionally includes out-of-order events and delayed events to observe the event-time semantics of Beam. verify Full codes here.
def format_joined_record(kv):
user_id, d = kv
return {
"user_id": user_id,
"count": int(d["count"][0]) if d["count"] else 0,
"sum_amount": float(d["sum_amount"][0]) if d["sum_amount"] else 0.0,
}
class WindowedUserAgg(beam.PTransform):
def expand(self, pcoll):
stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e["event_time"]))
windowed = stamped | beam.WindowInto(
FixedWindows(WINDOW_SIZE_SECS),
allowed_lateness=ALLOWED_LATENESS_SECS,
trigger=AfterWatermark(
early=AfterProcessingTime(10),
late=AfterProcessingTime(10),
),
accumulation_mode=AccumulationMode.ACCUMULATING,
)
keyed = windowed | beam.Map(lambda e: (e["user_id"], e["amount"]))
counts = keyed | beam.combiners.Count.PerKey()
sums = keyed | beam.CombinePerKey(sum)
return (
{"count": counts, "sum_amount": sums}
| beam.CoGroupByKey()
| beam.Map(format_joined_record)
)
We build a reusable Beam PTransform that encapsulates all framed assembly logic. We implement fixed windows, triggers, and accumulation rules, then group events by user and calculate counts and totals. We keep this transformation independent of the data source, so the same logic applies to both batch and streaming inputs. verify Full codes here.
class AddWindowInfo(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
ws = float(window.start)
we = float(window.end)
yield {
**element,
"window_start_utc": datetime.fromtimestamp(ws, tz=timezone.utc).strftime("%H:%M:%S"),
"window_end_utc": datetime.fromtimestamp(we, tz=timezone.utc).strftime("%H:%M:%S"),
"pane_timing": str(pane_info.timing),
"pane_is_first": pane_info.is_first,
"pane_is_last": pane_info.is_last,
}
def build_test_stream():
return (
TestStream()
.advance_watermark_to(t0)
.add_elements([
beam.window.TimestampedValue(make_event("u1", "purchase", 20, t0 + 5), t0 + 5),
beam.window.TimestampedValue(make_event("u1", "purchase", 15, t0 + 20), t0 + 20),
beam.window.TimestampedValue(make_event("u2", "purchase", 8, t0 + 35), t0 + 35),
])
.advance_processing_time(5)
.advance_watermark_to(t0 + 61)
.add_elements([
beam.window.TimestampedValue(make_event("u1", "refund", -5, t0 + 62), t0 + 62),
beam.window.TimestampedValue(make_event("u2", "purchase", 12, t0 + 70), t0 + 70),
beam.window.TimestampedValue(make_event("u3", "purchase", 9, t0 + 75), t0 + 75),
])
.advance_processing_time(5)
.add_elements([
beam.window.TimestampedValue(make_event("u2", "purchase", 3, t0 + 50), t0 + 50),
])
.advance_watermark_to(t0 + 121)
.advance_watermark_to_infinity()
)
We enrich each batch record with window and pane metadata so we can clearly see when and why results are produced. We convert Beam’s internal timestamps to human-readable UTC times for clarity. We also define a TestStream that emulates real streaming behavior using watermarks, processing time advances, and delayed data. verify Full codes here.
def run_batch():
with beam.Pipeline(options=PipelineOptions([])) as p:
(
p
| beam.Create(BATCH_EVENTS)
| WindowedUserAgg()
| beam.ParDo(AddWindowInfo())
| beam.Map(json.dumps)
| beam.Map(print)
)
def run_stream():
opts = PipelineOptions([])
opts.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=opts) as p:
(
p
| build_test_stream()
| WindowedUserAgg()
| beam.ParDo(AddWindowInfo())
| beam.Map(json.dumps)
| beam.Map(print)
)
run_stream() if MODE == "stream" else run_batch()
We connect everything together in an executable assembly and flow-like pipelines. We switch between modes by changing one sign while reusing the same aggregate transformation. We run the pipeline and print the results directly into the frames, making it easy to inspect the execution flow and output.
In conclusion, we demonstrate that the same Beam pipeline can process both bounded batch data and unbounded stream-like data while maintaining identical windowing and assembly semantics. We’ve noticed how watermarks, triggers, and accumulation modes affect when results appear and how delayed updates to pre-calculated data are on windows. We also focused on the conceptual foundations of the unified Beam model, providing a solid base for later extending the same design to live streamers and real production environments.
verify Full codes here. Also, feel free to follow us on twitter Don’t forget to join us 100k+ mil SubReddit And subscribe to Our newsletter. I am waiting! Are you on telegram? Now you can join us on Telegram too.
Check out our latest version of ai2025.deva 2025-focused analytics platform that turns model launches, performance benchmarks, and ecosystem activity into a structured data set that you can filter, compare, and export
Michel Sutter is a data science specialist and holds a Master’s degree in Data Science from the University of Padova. With a solid foundation in statistical analysis, machine learning, and data engineering, Michelle excels at transforming complex data sets into actionable insights.
Don’t miss more hot News like this! AI/" target="_blank" rel="noopener">Click here to discover the latest in AI news!
2026-01-07 21:08:00



