Apache Beam + Google Dataflow ============================= - `General Links`_ - `What are Apache Beam and Google Dataflow?`_ (the very basics) - `Apache Beam`_ - `Pipeline`_ - `PCollection`_ - `PTransform`_ - `Filter`_ - `ParDo`_ - `Additional outputs`_ - `Composite transforms`_ - `Dataflow`_ - `Setting the DataflowRunner and its configurations`_ - `Monitoring Interface`_ - `Error and exception handling`_ Note: References in [brackets] indicate PGB-specific examples of the indicated files/functions/etc. Most of these can be found at code path broker/beam/value_added/value_added.py or broker/beam/README.md. -------------- General Links ------------- - Apache Beam - **`Apache Beam Overview/Tutorial `__ Extremely useful!** - `Apache Beam SDK (2.25.0) for Python `__ - Dataflow - `Deploying a pipeline `__ - Apache Beam ```DataflowRunner`` `__ -------------- What are Apache Beam and Google Dataflow? ----------------------------------------- We use the `Apache Beam `__ Python SDK (`v2.25 docs `__) to run data processing and storage pipelines. The `Apache Beam Overview/Tutorial `__ is very good. Apache Beam pipelines can be run in a variety of environments; environment-specific "**runners**\ " handle the pipeline execution. We use the `DataflowRunner `__ to execute our pipelines in the Google Cloud using their `Dataflow `__ service (see also `Deploying a pipeline `__). You can also use `DirectRunner `__ to execute the pipeline on a local machine (useful for testing). Install the Beam Python SDK (including the Dataflow runner) with: ``pip install apache-beam[gcp]`` -------------- Apache Beam ----------- Apache Beam runs data processing pipelines. Batch and streaming jobs are both supported under a single programming model. Which type of job is run depends only on the initial data source input to the pipeline (and we must set the pipeline's ``streaming`` option), the rest of the programming logic and syntax is agnostic. Here we outline the essential concepts needed to program with Beam. *Much of it is quoted directly from `Apache Beam Overview/Tutorial `__.* ``Pipeline`` ~~~~~~~~~~~~~ `Programming guide: Create a pipeline `__ - The ``Pipeline`` abstraction encapsulates all the data and steps in the data processing task. - The "driver program" [``value_added/value_added.py``\ ] creates a ``Pipeline`` object [``pipeline``, within ``run()``] and uses it as the basis for creating the pipeline’s data (``PCollection``\ s) and its operations (``PTransform``\ s). - The general form of a step in the pipeline is (here, brackets indicate general Beam objects, not PGB-specific objects): - ``[Output PCollection] = [Input PCollection] | [Transform]`` ``PCollection`` ~~~~~~~~~~~~~~~~~ `Programming guide: PCollections `__ - A ``PCollection`` represents a distributed data set that the Beam pipeline operates on. - ``PCollection``\ s are the inputs and outputs for each step in the pipeline. - The exception is the first step of the pipeline, where we must pass the ``Pipeline`` object itself to a ``Read`` transform to create the initial collection [``PSin``\ ]. - The elements of a ``PCollection`` may be of any type, but must all be of the same type. *Our current pipeline uses dictionaries for the alert data and most of its child/downstream collections.* - A ``PCollection`` is immutable. A Beam Transform might process each element of a PCollection and generate new pipeline data (as a new ``PCollection``), *but it does not consume or modify the original input collection*. ``PTransform`` ~~~~~~~~~~~~~~~~ `Programming guide: PTransform `__ - A ``PTransform`` represents a data processing operation, or a step, in the pipeline. I/O (read/write) operations are special cases of ``PTransform``\ s. - Every ``PTransform`` takes one or more ``PCollection`` objects as input, performs some processing with/on the elements of that ``PCollection``, and produces zero or more output ``PCollection`` objects. - To invoke a transform, we apply it (using the pipe operator ``|``) to the input ``PCollection``. This takes the general form (here, brackets indicate general Beam objects, not PGB-specific objects): - ``[Output PCollection] = [Input PCollection] | [Transform]`` - We can decorate the transform (using the ``>>`` operator) with a name that will show up in Dataflow: - ``"My Transform Name" >> [Transform]`` - We can also chain transforms together without explicitly naming the output collections. - ``[Final Output PCollection] = ([Initial Input PCollection] | [1st Transform] | [2nd Transform])`` - How we apply the pipeline’s transforms to its various collections determines the structure of our pipeline. - The best way to think of the pipeline is as a directed acyclic graph, where ``PTransform`` nodes are subroutines that accept ``PCollection`` nodes as inputs and emit ``PCollection`` nodes as outputs. - Dataflow provides a "job graph" visualization of the pipeline. See `here `__ for the job graph in production [defined in ``ztf-beam.py``] at the time of this writing (the interface is described in the `Dataflow <#dataflow>`__ section below). - Built-in transforms (complete lists): - `Built-in I/O Transforms `__ (also see `Pipeline I/O Overview `__) - `Python transform catalog `__ **Two important transforms in our pipeline are:** ``Filter`` ^^^^^^^^^^^ `Programming guide: Filter `__ Given a predicate, filter out all elements that don't satisfy the predicate. 1. We write a function ([``is_extragalactic_transient``\ ]) which operates on a single element of the input collection and returns ``True`` if the element meets our condition(s) and ``False`` otherwise. 2. We apply our function as a filter on the pipeline by passing it to the ``Filter`` transform: - [``ExgalTrans = alertDicts | apache_beam.Filter(is_extragalactic_transient)``\ ] ``ParDo`` ^^^^^^^^^ `Programming guide: ParDo `__ See also: - `Transforms: ParDo `__ Generic parallel processing. 1. We write a function which: - performs some data processing (e.g., fit the data using Salt2) on a single element of the input collection, and - returns a list containing zero or more elements, each of which will become an element of the output collection. 2. We name that function ``process`` and wrap it in an arbitrarily-named class [``fitSalt2``\ ] (subclass of ``DoFn``). 3. We apply our function to each element of the step's input ``PCollection`` by passing the class to the ``ParDo`` transform: - [``salt2Dicts = ExgalTrans | apache_beam.ParDo(fitSalt2())``\ ]. Additional outputs ''''''''''''''''''' `Programming guide: Additional outputs `__ See also: - `Example: multiple\_output\_pardo.py `__ ``ParDo`` (or the ``DoFn`` passed to it) can produce more than one output PCollection. The main output should be returned as normal(\*), additional outputs should be tagged using ``apache_beam.pvalue.TaggedOutput('tag',element)`` See the examples in the links above, and [the ``FitSalt2`` (``DoFn``) class in `beam\_helpers/salt2\_utils.py `__]. (\*) We typically use ``return`` statements in our ``DoFn``\ s, but we also have the option of using ``yield`` statements (making the ``DoFn`` a generator). However, to return *multiple outputs* we must use ``yield`` statements. Composite transforms ^^^^^^^^^^^^^^^^^^^^^ `Programming guide: Composite transforms `__ See also: - `Creating composite transforms `__ - Example in: `ptransform_fn `__ To make the pipeline structure more clear and modular, we can group multiple transforms into a single composite transform. We do this by creating a subclass of the ``PTransform`` class and overriding the ``expand`` method to specify the actual processing logic. We can then use this transform just as we would a built-in transform from the Beam SDK. See the links above and [the ``Salt2`` composite transform at code path broker/beam/value_added/value_added.py]. -------------- Dataflow -------- `Dataflow `__ is a Google service that runs Apache Beam pipelines on the Google Cloud Platform (GCP). `Deploying a pipeline `__ is a good place to start. Dataflow handles the provisioning and management of all GCP resources (e.g., Compute Engine virtual machines or "workers"), and `autoscales `__ resources based on the (streaming) pipeline's current backlog and the workers' CPU usage over the last couple of minutes. Setting the DataflowRunner and its configurations ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ We tell the Beam pipeline to run on Dataflow by setting it as the "runner". The runner, and its configuration options, are set when creating the Beam ``Pipeline`` object. We pass them in as command line arguments when starting the job. [see the file at code path broker/beam/README.md]. - ``--runner=DataflowRunner`` runs the job in the Google Cloud via Dataflow - See `Pipeline options for the Cloud Dataflow Runner `__ for a complete list of Dataflow runner configuration options. Monitoring Interface ~~~~~~~~~~~~~~~~~~~~ Dataflow also provides us with a nice `monitoring interface `__ [see `here `__ for the job in production at the time of this writing]. There we can see: - A graphical representation of pipelines. - Details about the job's status and execution. - Errors, warnings, and additional diagnostics. Links to the complete logs. - Monitoring charts with job-level and step-level metrics. Error and exception handling ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ `GCP docs: Error and exception handling `__ Quoted directly from the link, with emphasis added: "Your pipeline may throw exceptions while processing data. Some of these errors are transient (e.g., temporary difficulty accessing an external service), but some are permanent, such as errors caused by corrupt or unparseable input data, or null pointers during computation. **Dataflow processes elements in arbitrary bundles, and retries the complete bundle when an error is thrown for any element in that bundle.** When running in batch mode, bundles including a failing item are retried 4 times. The pipeline will fail completely when a single bundle has failed 4 times. **When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall.**"