Monitoring your Dataflow pipelines: an overview

Jérémie Gomez
Google Cloud - Community
10 min readFeb 23, 2021

--

Apache Beam is a programming model that lets you implement batch and streaming data processing jobs that can run on multiple execution engines, including Dataflow, the execution engine created by Google and available on the Google Cloud Platform.

You have decided to use Dataflow to run your Beam pipelines, great choice! You need to know if your pipeline operates properly, especially if you chose a streaming pipeline. As these jobs run continuously, they should be continuously monitored.

If you feel a bit lost in choosing your monitoring strategy because Dataflow can leverage multiple types of metrics and logs, don’t worry! This article gives an overview of the different metrics and logs you can use on Google Cloud Platform to monitor your Dataflow jobs. The goal of this post is not to get into the detail of each metric but to give you a sense of the landscape of the metrics & logs that are available to you.

For this post, I will monitor a streaming Dataflow job that reads from a Pub/Sub subscription and writes to a BigQuery table with streaming inserts. You can easily run the same job without writing any code by using the Streaming Data Generator template and the Pub/Sub subscription to BigQuery template.

Choose your Cloud Monitoring metrics

Cloud Monitoring is the native solution on GCP for all your metrics and alerts. It is integrated with most products in GCP, and Dataflow is of course no exception.

In the context of Dataflow, Cloud Monitoring offers multiple types of metrics:

  • Standard metrics
  • VM (GCE) metrics
  • Monitoring agent metrics
  • Custom metrics
  • Additional metrics

Standard metrics

The standard metrics will give you a lot of information about your job. They start with “dataflow.googleapis.com/job/”.

Some frequent monitoring questions that can be answered by standard metrics include:

  • Has my job failed? dataflow.googleapis.com/job/is_failed
  • How long is my job taking to process messages?dataflow.googleapis.com/job/system_lag
  • Is there a sudden spike in processing? dataflow.googleapis.com/job/current_num_vcpus to know if your job scaled a lot, dataflow.googleapis.com/job/element_count, dataflow.googleapis.com/job/elements_produced_count(throughput), etc.
  • Is the data processed fresh? dataflow.googleapis.com/job/per_stage_data_watermark_age or dataflow.googleapis.com/job/data_watermark_age (a.k.a data freshness)

Some metrics are a function of time and are useful for estimating real-time costs, such as:

  • dataflow.googleapis.com/job/total_memory_usage_time
  • dataflow.googleapis.com/job/total_vcpu_time

Additional metrics are available if your job reads from Pub/Sub, such as dataflow.googleapis.com/job/pubsub/read_latencies

You can have a look at the full list of standard metrics.

Here are a few of these metrics set up in a dashboard in Cloud Monitoring for my Pub/Sub to BigQuery job.

Cloud Monitoring dashboard with some some interesting standard metrics.

This dashboard is only made of a few standard metrics and you can already see things like:

  • The job has not failed (even though this metric will remain at 0 for streaming jobs anyway, since they keep retrying)
  • Performance: the system lag oscillates around 20 seconds, with data freshness being about the same. The job did not need to autoscale (only 1 vCPU).
  • Pub/Sub behaviour: by choosing to filter the elements_produced metric on the PCollection “PubsubIO.Read/PubsubUnboundedSource.out0", we can see the throughput of reading from Pub/Sub. It is about 100 messages/sec, i.e the same as the rate being published at.

GCE Metrics

Dataflow creates its worker machines as Google Compute Engine virtual machines. Hence, you can use all the GCE metrics to know more about the state of your worker machines. They start with “compute.googleapis.com/”.

Dataflow will automatically create two labels on the VMs it creates: dataflow_job_id and dataflow_job_name. As a consequence, you can easily filter GCE metrics by job ID or job name just like you do with standard metrics.

Some frequent monitoring cases that can be answered by GCE metrics are:

  • How much of my CPU is being utilised?compute.googleapis.com/instance/cpu/utilization
  • How much disk write is happening? compute.googleapis.com/instance/disk/write_bytes_count
  • How used are my disks?
    compute.googleapis.com/guest/disk/bytes_used
  • How much of my RAM is used?compute.googleapis.com/instance/memory/balloon/ram_used(only for machines of the E2 family)

You can have a look at the full list of GCE metrics.

Here are a few of these GCE metrics set up in a dashboard in Cloud Monitoring for my Pub/Sub to BigQuery job.

Cloud Monitoring dashboard with GCE metrics

We can see that CPU utilisation was quite low, we can also see a burst in disk IO when the job started reading from Pub/Sub, after which disk IO returned to a low value. Since I use the n1-standard-4 machines, you can see the VM memory used is blank, since this metric currently is only for machines of the E2 family.

Monitoring agent metrics

If the GCE metrics are not enough, you can install the Monitoring agent on the VMs. The Monitoring agent gathers system and application metrics from virtual machine instances and sends them to Cloud Monitoring. They start with “agent.googleapis.com/”.

On Dataflow, in order to install the agent on your workers, you only need to use this pipeline parameter when running your job:
--experiments=enable_stakdriver_agent_metrics

One metric you might be interested in from the agent is the percentage of RAM used, if you are not using an E2 machine : agent.googleapis.com/memory/percent_used.

You can have a look at the full list of the monitoring Agent metrics.

Custom metrics

Beam lets you create your own metrics from your code. There are three types of metric:

  • Counter: can be incremented and decremented.
  • Distribution: takes multiple values, and will expose statistics about the distribution of these values.
  • Gauge: a scalar metric that is aggregated by taking the last reported value.

Have a look at the section on metrics of the Beam programming guide for more information on how to implement it in your code. Once you have coded your metrics, they become available in Cloud Monitoring and they start with “custom.googleapis.com/dataflow”.

Dataflow does not support gauges; you can use counter and distribution custom metrics.

The BigQueryIO sink already implements a custom metric of type counter called “bytes_written”. Here is what it looks like, this time in the Metrics Explorer.

Cloud Monitoring metrics explorer: bytes_written custom metric

Since my job is doing streaming inserts into BigQuery, we see a nice linear function of bytes written.

Additional metrics

Some metrics on GCP will apply to most resources. One example is logging metrics.

Your Dataflow job will automatically write logs, and you can also write your own custom logs (see the Cloud Logging section below). You can use logging metrics to monitor the amount of logs your job generates, and more. They start with “logging.googleapis.com/”

For instance, logging.googleapis.com/byte_countwill help you know the volume of logs that have been written.

Cloud Monitoring metrics explorer : byte_count logging metric

We can see that many logs were generated only at the beginning of the job. It is important to monitor log volume during run in order to avoid unpredicted costs.

Set up alerts on these metrics

We have seen that you can think of at least 5 types of metric for Dataflow that each have their own use.

Before you set up the alerts, think about your dependencies too. Your Dataflow job might read some data from a Redis database in Memorystore. In this case you need to find the right metrics to monitor your Memorystore. This way, if two alerts happen both in your Dataflow job and in your Memorystore, you can start investigating this correlation straight away.

Once you have chosen the metrics that are of interest, or even created your own metrics, Cloud Monitoring enables you to create alerts.

The alerts consist of policies based on metrics (e.g. when my CPU utilisation > 95% for more than 5 minutes) and they are propagated to notification channels like emails, Slack channels, SMS, etc.

Choosing Pub/Sub as your notification channel helps you integrate with third-party alerting tools. It can also enable you to trigger Cloud Functions in order to automatically execute an action (e.g. changing your max allowed number of workers, restart a service) in reaction to an alert.

Even though you can set alerts in the Cloud Console UI, it is of course recommended to set up your alerts in your Infrastructure as Code pipeline, for instance with Terraform.

Once you have set up alerts, you can also set up dashboards and groups to improve the visual representation of your metrics. The Dataflow UI (see below) will also show you the most important metrics in a nice visual way.

Use Cloud Logging and logs-based metrics

While Cloud Monitoring is all about metrics, Cloud Logging is all about… logs. Cloud Logging can be a critical tool in understanding what your job is doing.

Native logs

Dataflow will generate logs flowing to Cloud Logging by default. You can control the level of log for Dataflow.

Dataflow logs can be filtered by:

  • for key resource.type, use the dataflow_stepvalue.
  • or for key logName, use the projects/{project-id}/logs/dataflow.googleapis.com%2FX value, with X any of these: agent, docker, job-message, jvm-gc, kubelet, resource, shuffler, shuffler-startup, system, vm-health, vm-monitor, worker, worker-startup.

This shows that multiple components of Dataflow generate logs ; most of the time you will be interested in the job-message and worker logs, but other logs might come in handy.

Custom logs

It is very simple to generate your own logs by using a logger in your Beam code. In Java, the recommended way is to use SLF4J. Your custom logs will appear in Cloud Logging as Dataflow worker logs.

Custom logs are important to give visibility on what business logic your job is performing, and will help you debug. Choose wisely the level of log in your code (LOG.info, LOG.debug, etc.) so that the logs exist but not too many logs are generated in the nominal case.

Including job-message logs for a specific job name in the Cloud Logging logs explorer

Logs-based metrics

This is where Cloud Monitoring and Cloud Logging come together. In Cloud Logging, you can set up a metric based on logs (counter or distribution metric). This metric will then appear like any other metric in Cloud Monitoring, and you can set up alerts on it. It will start with “logging.googleapis.com/user”.

The astute reader might argue that custom metrics and logs-based metrics based on custom logs look like they solve the same problem. Let’s say that your job reads messages from Pub/Sub and you want to be alerted if more than more than 10 messages in a minute are wrongly formatted. You could:

  • Use a Beam Counter custom metric that you would increment each time you encounter a wrongly formatted message.
  • Generate a custom log “Wrongly formatted message” for each one, and set up a logs-based metric on this log.

Both are viable solutions. These constraints might help you choose:

  • Dataflow reports custom metrics to Monitoring approximately every 30 seconds. The reported values for these metrics are approximate.
  • Log ingestion time is usually low but you may find it varies more than metrics ingestion time, hence making your time to alert somewhat less predictable. The best is to measure it yourself on your use case. Logs-based metrics are also a little simpler since you do not have to modify your Beam code.

Leverage the Dataflow UI

Setting up alerts on metrics is very important for monitoring your jobs. However, what if you are trying to investigate how your job is doing? Or maybe you just received an alert and would like to understand better what is going on. You can directly leverage the Dataflow UI to look at your logs, and to see important metrics for your pipeline, all in one single UI.

The Job graph tab shows you the structure of your graph, as well as some metrics. Click on each stage to get more metrics on this stage, in particular:

  • Elements added (which is the same as the element_count metric) for input and output collections
  • Throughput (which is the same as the element_produced_count metrics) for input and output collections
  • Output data freshness (which is the same as the per_stage_data_watermark metric)
Job Graph tab : DAG & metrics

The jobs metrics tab will show you some metrics that we have already mentioned, for the whole job (not for each stage).

A useful feature here is the “Create alerting policy” link. It is a nice shortcut to create an alert on the metric that is used to display a particular graph.

Job Metrics tab

Finally, opening up the log panel from the bottom will show your Dataflow logs. Only jobs logs and worker logs will be displayed here. If you need to see the other log types, you need to use the Cloud Logging interfaces.

Make sure you notice the dropdown menu to choose the level of logs you want to display. Here, I display all logs up to the debug level.

Logs are displayed at the bottom of the Dataflow UI.

What’s next

Google’s SRE (Site Reliability Engineering) workbook contains a chapter about data processing pipelines that will help you to:

  • Have a framework for thinking about how your pipelines could fail (delayed data, corrupt data), and why it could fail (pipelines dependencies, application & configuration, unexpected resource growth, outage).
  • Define Service Level Objectives that are fundamental for monitoring an application.
  • Think more broadly about how to operate these pipelines.

Monitoring can sometimes be neglected and only looked at when things start to go wrong, but it is actually essential to ensure the availability and reliability of your data processing pipelines. It’s really important to think of monitoring as an integral part of developing your pipelines, and this article shows how easy it is to set this up.

--

--