Guide April 18, 2026 · 18 mins · The D23 Team

Google Cloud Logging for Data Pipeline Observability

Master Google Cloud Logging for unified observability across BigQuery, Dataflow, and Composer. Real-world strategies for production data pipelines.

Google Cloud Logging for Data Pipeline Observability

Understanding Google Cloud Logging in Data Pipelines

Data pipelines are the arteries of modern analytics. They move terabytes of information through BigQuery, transform data with Dataflow, and orchestrate workflows with Cloud Composer—but only if you can see what’s happening inside them. That’s where Google Cloud Logging comes in.

Google Cloud Logging is GCP’s centralized log management service that ingests, stores, and analyzes logs from virtually every service in your data stack. Unlike scattered log files on individual servers or application-level logging that only captures business events, Cloud Logging gives you a unified view of system events, application errors, audit trails, and custom metrics across your entire data infrastructure.

For data engineering teams managing production pipelines, this matters enormously. A Dataflow job that silently fails at 3 AM, a BigQuery query that runs 10x slower than expected, or a Composer DAG that skips tasks without alerting—these aren’t theoretical problems. They’re the kinds of issues that cascade into stale dashboards, missed SLAs, and data quality disasters. Implementing observability in GCP requires more than just turning on logging; it requires understanding how to instrument each component of your pipeline and correlate signals across services.

Cloud Logging sits at the center of that strategy. It’s not a replacement for application-specific monitoring or custom dashboards—it’s the foundation that makes everything else possible. When you’re running analytics at scale, you need to know the difference between a transient network blip and a structural problem in your pipeline. You need to trace a data quality issue back to its source. You need alerts that actually mean something, not noise.

This article walks you through how to instrument Google Cloud Logging across BigQuery, Dataflow, and Composer, then how to actually use those logs to understand what’s happening in your pipelines. We’ll cover the mechanics of log ingestion, query patterns that reveal real problems, and how to build observability into your architecture from the start.

The Observability Stack: Where Cloud Logging Fits

Google Cloud’s observability suite consists of three interconnected services: Cloud Logging, Cloud Monitoring, and Cloud Trace. Each serves a different purpose, but they work together.

Cloud Logging captures raw events—what happened, when, and where. It’s the source of truth. A Dataflow worker crashed. A BigQuery query timed out. A Composer task failed. These are log events.

Cloud Monitoring aggregates metrics—numerical measurements over time. CPU usage, query latency, job completion rate. These are typically derived from logs or emitted directly by applications.

Cloud Trace tracks distributed requests across services. When a data request flows through multiple systems, Trace shows you the path and identifies bottlenecks.

For data pipeline observability, Cloud Logging and Monitoring form the core of your observability strategy. Trace is useful for latency-sensitive workloads but less critical for batch pipelines. The real power comes from understanding how to structure your logs so you can query them effectively and surface patterns that matter.

Think of it this way: if your data pipeline is a production system (and it should be treated as one), then Cloud Logging is your system’s nervous system. It tells you what’s happening. The question is whether you’re listening.

Logging in BigQuery: Query Execution and Performance

BigQuery is often treated as a black box. You submit a query, it returns results, and you move on. But BigQuery generates extensive logs that reveal exactly what’s happening inside that box: which tables were scanned, how much data was processed, whether slot reservations were used, and where query execution got stuck.

Enabling BigQuery Audit Logs

BigQuery automatically writes audit logs to Cloud Logging, but you need to enable them explicitly. There are two types:

Admin Activity logs capture metadata operations: dataset creation, table schema changes, permission modifications. These are enabled by default and stored for 30 days at no cost.

Data Access logs capture read operations: queries executed, tables scanned, data exported. These are disabled by default because they generate high volume and incur storage costs. But for production pipelines, they’re essential.

To enable Data Access logs, navigate to your GCP project’s IAM & Admin > Audit Logs, find BigQuery API, and check “Data Access.” This immediately starts logging every query execution.

The cost is real—a high-volume query workload can generate gigabytes of logs daily. But the alternative is flying blind. Building an audit log analysis pipeline with BigQuery on GCP is a standard pattern: ingest Cloud Logging data into a separate BigQuery dataset, then query it to understand your own usage.

What BigQuery Logs Tell You

When a query executes, BigQuery writes a log entry containing:

  • Query text: The actual SQL that ran
  • User identity: Who executed the query
  • Timestamp: When it started and ended
  • Bytes processed: How much data was scanned (critical for cost tracking)
  • Bytes billed: What you actually pay for (affected by caching, reservations)
  • Job ID: Unique identifier linking to other logs from the same query
  • Resource names: Which datasets and tables were accessed
  • Query plan: Execution stages and performance metrics

These fields let you answer real questions:

  • Which queries are the most expensive? (Sort by bytes_billed, identify outliers)
  • Which tables are accessed most frequently? (Join with table metadata, spot hot datasets)
  • Are queries hitting the cache or rescanning data? (Compare bytes_processed vs. bytes_billed)
  • Which users or services are generating unexpected load? (Filter by user_identity, aggregate by caller_ip)
  • Is a query slower than it should be? (Track query_duration over time, identify regressions)

For data pipeline observability, this means you can detect when a scheduled query suddenly becomes expensive (indicator of data quality issues or upstream changes), when a reporting dashboard query starts timing out (signal that the dataset has grown), or when an analytics team is running unoptimized queries at scale.

Dataflow Logging: Job Execution and Worker Health

Dataflow is where the heavy lifting happens in most data pipelines. It processes terabytes of data, applies transformations, and writes results to BigQuery or Cloud Storage. But Dataflow jobs run across distributed workers, and distributed systems are inherently harder to observe.

Dataflow Logs and Job Metrics

Dataflow automatically writes logs to Cloud Logging from every worker. These logs include:

  • Worker startup and shutdown events: When workers scale up or down
  • Task execution logs: When individual pipeline stages complete
  • Error messages: Exceptions, failed records, timeouts
  • Custom application logs: Whatever your pipeline code writes to stdout/stderr

The key is that Dataflow logs are tied to a job ID, which lets you correlate all logs from a single pipeline run. If a job fails, you can query Cloud Logging for that job ID and see the complete execution history across all workers.

Instrumenting Dataflow Pipelines

Out of the box, Dataflow logs are basic. To make them useful for observability, you need to instrument your pipeline code.

In Apache Beam (the framework underlying Dataflow), use the logging library to emit structured logs:

import logging

logger = logging.getLogger(__name__)

class MyDoFn(beam.DoFn):
  def process(self, element):
    try:
      result = transform(element)
      logger.info(f"Processed element", extra={
        "element_id": element["id"],
        "result_size": len(result),
        "timestamp": datetime.now().isoformat()
      })
      yield result
    except Exception as e:
      logger.error(f"Failed to process element", extra={
        "element_id": element["id"],
        "error": str(e),
        "error_type": type(e).__name__
      })

Structured logging (using the extra parameter) ensures that fields are queryable in Cloud Logging. Avoid unstructured log messages like “Something went wrong”—they’re useless for observability.

Detecting Dataflow Problems

With proper instrumentation, Cloud Logging reveals:

  • Skewed data distribution: If some workers process 100x more records than others, it indicates a hotkey problem that will cause the job to stall
  • Transformation failures: If a specific transform is dropping records, logs will show which records failed and why
  • Resource exhaustion: If workers are running out of memory or disk, logs will show OOM errors or disk full messages
  • Slow stages: If a particular pipeline stage takes much longer than expected, logs will reveal it

Data pipeline observability architecture requires logging every pipeline component, not just high-level job status. This means instrumenting custom transforms, external API calls, and data quality checks.

Cloud Composer Observability: Orchestration and Task Execution

Cloud Composer is GCP’s managed Airflow service. It orchestrates your data pipelines—triggering Dataflow jobs, running BigQuery queries, and coordinating dependencies. Unlike Dataflow, which processes data, Composer manages the workflow. But Composer still needs observability.

Airflow Logs in Cloud Logging

Cloud Composer automatically sends Airflow logs to Cloud Logging. These include:

  • DAG execution logs: When DAGs are triggered, whether manually or by schedule
  • Task execution logs: Stdout/stderr from each task
  • Scheduler logs: Whether the scheduler is running and picking up DAGs
  • Worker logs: Celery worker status (if using Celery executor)

Unlike BigQuery and Dataflow, which have native GCP logging integration, Airflow logs require a bit more setup. By default, Composer sends logs to Cloud Logging, but you need to ensure your DAG code is writing logs that are actually useful.

Instrumenting Airflow DAGs

In Airflow, use the built-in logger to emit structured information:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.log.logging_mixin import LoggingMixin
import logging

logger = logging.getLogger(__name__)

def extract_data(**context):
    execution_date = context["execution_date"]
    logger.info(f"Starting extraction", extra={
        "execution_date": execution_date.isoformat(),
        "task_id": context["task"].task_id,
        "dag_id": context["dag"].dag_id
    })
    
    # Your extraction logic
    rows_extracted = 1000000
    logger.info(f"Extraction complete", extra={
        "rows_extracted": rows_extracted,
        "duration_seconds": 45
    })

with DAG("data_pipeline", ...) as dag:
    extract = PythonOperator(
        task_id="extract",
        python_callable=extract_data
    )

Key fields to include in Composer logs:

  • execution_date: Which run of the DAG this is (critical for debugging retries)
  • task_id and dag_id: Which part of your workflow is running
  • Numeric metrics: Rows processed, duration, records failed
  • Status indicators: “starting”, “complete”, “failed”

Detecting Composer Issues

With proper logging, Cloud Logging reveals:

  • DAG scheduling problems: If a DAG isn’t being triggered on schedule, logs show whether the scheduler is running
  • Task dependencies failing: If a task depends on another task’s output, logs show what went wrong
  • Sensor timeouts: If a sensor is waiting for a condition that never arrives, logs show how long it waited
  • Resource contention: If multiple tasks are competing for resources, logs show execution times increasing

Unified Querying: The Logs Explorer

Now you have logs from BigQuery, Dataflow, and Composer all flowing into Cloud Logging. The next step is actually querying them to understand what’s happening.

Using the Logs Explorer

The Logs Explorer is Cloud Logging’s query interface. It uses a filter syntax that looks like:

resource.type="bigquery_resource"
severity="ERROR"
timestamp>="2024-01-15T00:00:00Z"

You can filter by:

  • resource.type: What generated the log (bigquery_resource, dataflow_job, gce_instance)
  • severity: ERROR, WARNING, INFO, DEBUG
  • timestamp: When the log was generated
  • labels: Custom tags you’ve added to logs
  • jsonPayload: Structured data you’ve logged

Using Logs Explorer and Observability Analytics allows you to query logs and generate insights directly in Cloud Logging without exporting to BigQuery first.

Here are practical queries for data pipeline observability:

Find all failed BigQuery queries in the last hour:

resource.type="bigquery_resource"
severity="ERROR"
timestamp>="2024-01-15T23:00:00Z"

Find Dataflow jobs that processed more than 1TB:

resource.type="dataflow_job"
jsonPayload.bytes_processed>1000000000000

Find Cloud Composer tasks that took longer than 30 minutes:

resource.type="cloud_composer_environment"
jsonPayload.duration_seconds>1800

Find all queries from a specific service account:

resource.type="bigquery_resource"
protoPayload.authenticationInfo.principalEmail="pipeline@myproject.iam.gserviceaccount.com"

Exporting Logs to BigQuery

For serious observability, export Cloud Logging data to BigQuery. This lets you run SQL queries on your logs and build dashboards.

Create a log sink in Cloud Logging:

  1. Go to Cloud Logging > Logs Router
  2. Create a new sink
  3. Set the destination to a BigQuery dataset
  4. Define a filter (e.g., all logs from your project)
  5. Click Create

Cloud Logging will now write all matching logs to a BigQuery table. The table schema is automatically created, with columns for timestamp, severity, resource, jsonPayload, and more.

Once in BigQuery, you can run SQL to analyze your pipeline:

SELECT
  jsonPayload.job_id,
  jsonPayload.query,
  jsonPayload.bytes_billed,
  jsonPayload.query_duration_seconds,
  jsonPayload.user_identity
FROM `myproject.cloud_logging.cloudaudit_googleapis_com_activity`
WHERE resource.type = "bigquery_resource"
  AND jsonPayload.bytes_billed > 1000000000000
  AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
ORDER BY jsonPayload.bytes_billed DESC

This query finds your most expensive queries from the last week—essential for cost optimization.

Building Real Observability: Beyond Logs

Logs are raw data. Observability comes from turning those logs into actionable insights. This means building dashboards, setting up alerts, and establishing runbooks.

Creating Observability Dashboards

Once logs are in BigQuery, you can create dashboards using D23, which provides embedded analytics and self-serve BI on Apache Superset, Looker Studio, or any BI tool. The key is tracking metrics that matter for your pipelines:

Data Freshness: How often are your datasets updated? Are pipelines running on schedule?

SELECT
  DATE(timestamp) as date,
  COUNT(*) as pipeline_runs,
  COUNTIF(status = "SUCCESS") as successful_runs,
  COUNTIF(status = "FAILED") as failed_runs,
  ROUND(100 * COUNTIF(status = "SUCCESS") / COUNT(*), 2) as success_rate
FROM `myproject.pipeline_logs.execution_log`
GROUP BY date
ORDER BY date DESC

Query Performance: Are BigQuery queries getting slower?

SELECT
  DATE(timestamp) as date,
  APPROX_QUANTILES(query_duration_seconds, 100)[OFFSET(50)] as p50_duration,
  APPROX_QUANTILES(query_duration_seconds, 100)[OFFSET(95)] as p95_duration,
  APPROX_QUANTILES(query_duration_seconds, 100)[OFFSET(99)] as p99_duration,
  MAX(query_duration_seconds) as max_duration
FROM `myproject.cloud_logging.cloudaudit_googleapis_com_activity`
WHERE resource.type = "bigquery_resource"
GROUP BY date
ORDER BY date DESC

Data Quality: Are transformations dropping records?

SELECT
  jsonPayload.transform_name,
  SUM(jsonPayload.records_input) as total_input,
  SUM(jsonPayload.records_output) as total_output,
  ROUND(100 * (1 - SUM(jsonPayload.records_output) / SUM(jsonPayload.records_input)), 2) as drop_rate
FROM `myproject.cloud_logging.dataflow_logs`
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY jsonPayload.transform_name
HAVING drop_rate > 5

Setting Up Alerts

Dashboards are passive—they show you what happened. Alerts are active—they tell you when something is wrong.

In Cloud Monitoring, create alert policies based on your logs:

Alert: Pipeline failure rate exceeds 10%

resource.type="cloud_composer_environment"
jsonPayload.status="FAILED"

Set the condition: if the rate of failed tasks exceeds 10% of all tasks in a 5-minute window, trigger an alert.

Alert: BigQuery query latency p95 exceeds 30 seconds

resource.type="bigquery_resource"

Set the condition: if the 95th percentile query duration exceeds 30 seconds, trigger an alert.

Alert: Dataflow job processing rate drops below expected

resource.type="dataflow_job"

Set the condition: if records per second drops below your baseline, trigger an alert.

The key to effective alerting is avoiding noise. Every alert should require action. If you’re getting paged for alerts you ignore, you’ve lost observability.

Advanced Patterns: Correlation and Root Cause Analysis

When something goes wrong in your data pipeline, it rarely fails in isolation. A Dataflow job fails because a BigQuery table isn’t ready. A Composer DAG times out because a query is slow. Understanding these correlations is where observability becomes powerful.

Tracing Data Through the Pipeline

The most useful pattern is adding a trace ID to logs across your entire pipeline. When a record enters your pipeline, assign it a unique ID and include that ID in every log from every service.

In your Composer DAG, generate a trace ID:

import uuid

def extract_data(**context):
    trace_id = str(uuid.uuid4())
    context["task_instance"].xcom_push(key="trace_id", value=trace_id)
    logger.info(f"Starting extraction", extra={"trace_id": trace_id})

Pass the trace ID to Dataflow:

from airflow.providers.google.cloud.operators.dataflow import DataflowTemplateOperator

transform = DataflowTemplateOperator(
    task_id="transform",
    template="gs://my-templates/transform",
    parameters={
        "trace_id": "{{ ti.xcom_pull(task_ids='extract', key='trace_id') }}"
    }
)

In your Dataflow code, include the trace ID in all logs:

class MyDoFn(beam.DoFn):
  def process(self, element, trace_id):
    logger.info(f"Processing element", extra={"trace_id": trace_id, "element_id": element["id"]})

Now, when something goes wrong, you can query Cloud Logging for a specific trace ID and see the complete path of a record through your entire pipeline:

trace_id="abc123def456"

This query returns every log entry related to that record, across Composer, Dataflow, and BigQuery. Instead of looking at millions of logs, you’re looking at the specific execution path that matters.

Correlation Analysis

Beyond individual trace IDs, you can correlate patterns across logs:

When BigQuery queries get slow, do Dataflow jobs also get slow?

WITH bigquery_latency AS (
  SELECT
    TIMESTAMP_TRUNC(timestamp, HOUR) as hour,
    APPROX_QUANTILES(query_duration_seconds, 100)[OFFSET(95)] as bq_p95
  FROM `myproject.cloud_logging.cloudaudit_googleapis_com_activity`
  WHERE resource.type = "bigquery_resource"
  GROUP BY hour
),
dataflow_latency AS (
  SELECT
    TIMESTAMP_TRUNC(timestamp, HOUR) as hour,
    APPROX_QUANTILES(jsonPayload.processing_time_seconds, 100)[OFFSET(95)] as df_p95
  FROM `myproject.cloud_logging.dataflow_logs`
  GROUP BY hour
)
SELECT
  bq.hour,
  bq.bq_p95,
  df.df_p95,
  CORR(bq.bq_p95, df.df_p95) OVER (ORDER BY bq.hour ROWS BETWEEN 7 PRECEDING AND CURRENT ROW) as correlation
FROM bigquery_latency bq
JOIN dataflow_latency df ON bq.hour = df.hour
ORDER BY bq.hour DESC

If BigQuery and Dataflow latency are correlated, the issue is likely shared infrastructure (network, storage). If they’re independent, the issue is specific to one service.

Practical Implementation: From Logs to Insights

Here’s a concrete example of implementing observability for a real data pipeline:

The Pipeline: A Composer DAG that runs daily, extracts data from a source system using Dataflow, loads it into BigQuery, and serves it via dashboards.

The Problem: The pipeline takes 2 hours on Monday but 4 hours on Friday. You need to understand why.

The Solution:

  1. Instrument each component: Add structured logging to the Composer DAG, Dataflow job, and BigQuery loads
  2. Export logs to BigQuery: Create a log sink so you can query logs with SQL
  3. Analyze by day of week:
SELECT
  FORMAT_TIMESTAMP("%A", timestamp) as day_of_week,
  COUNT(DISTINCT DATE(timestamp)) as num_runs,
  APPROX_QUANTILES(jsonPayload.duration_seconds, 100)[OFFSET(50)] as p50_duration,
  APPROX_QUANTILES(jsonPayload.duration_seconds, 100)[OFFSET(95)] as p95_duration,
  MAX(jsonPayload.duration_seconds) as max_duration
FROM `myproject.cloud_logging.pipeline_logs`
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 DAY)
GROUP BY day_of_week
ORDER BY day_of_week
  1. Drill down into Friday runs:
SELECT
  jsonPayload.stage,
  COUNT(*) as num_runs,
  APPROX_QUANTILES(jsonPayload.duration_seconds, 100)[OFFSET(95)] as p95_duration
FROM `myproject.cloud_logging.pipeline_logs`
WHERE FORMAT_TIMESTAMP("%A", timestamp) = "Friday"
  AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 DAY)
GROUP BY jsonPayload.stage
ORDER BY p95_duration DESC

This reveals which stage is slow on Fridays. Maybe the Dataflow job takes longer because the source system is busier. Maybe the BigQuery load is slower because the dataset is larger. Once you know which stage, you can investigate further.

  1. Set up alerts: Create an alert if Friday pipeline duration exceeds the historical p95, so you know immediately when something changes.

Integration with Your Analytics Platform

While Cloud Logging and Cloud Monitoring are powerful, many organizations want to visualize pipeline observability alongside their business metrics. D23 provides embedded analytics and self-serve BI capabilities that can integrate with your observability data.

You can export your pipeline logs to BigQuery, then use D23’s API-first approach and self-serve BI features to create dashboards that show both operational metrics (pipeline latency, failure rates) and business metrics (data freshness, query costs) in one place.

This is particularly valuable for data leaders who need to communicate pipeline health to stakeholders. Instead of maintaining separate observability dashboards and business dashboards, you have a unified view of how your data infrastructure supports your analytics.

Best Practices for Data Pipeline Observability

Implementing observability isn’t a one-time project—it’s a practice. Here are the core principles:

1. Instrument from the start: Don’t add logging as an afterthought. When you build a new pipeline, include structured logging from day one.

2. Use structured logging: Avoid unstructured messages. Use JSON payloads with queryable fields.

3. Include context: Every log should include enough information to understand what was happening (task ID, execution date, user, resource).

4. Log at the right level: DEBUG for development, INFO for normal operation, WARNING for unexpected conditions, ERROR for failures.

5. Set up alerts early: Don’t wait for a production incident to realize you need monitoring. Define what “normal” looks like and alert when you deviate from it.

6. Correlate signals: Use trace IDs and timestamps to connect logs across services.

7. Review and iterate: Observability is never complete. As your pipelines grow, your observability needs to grow with them.

Common Pitfalls and How to Avoid Them

Pitfall 1: Logging too much

Cloud Logging charges by ingestion volume. If you log every record processed by Dataflow, your logs will cost more than your compute. Log at the task level (“processed 1M records”), not the record level.

Pitfall 2: Logs with no context

A log that says “Error: Connection refused” is useless without context. Which service? Which resource? When? Include enough information to act on the log.

Pitfall 3: Alerts that don’t alert

If you have 50 alerts and only 5 of them require action, you’ll ignore all of them. Be selective. Every alert should be actionable.

Pitfall 4: No runbooks

When an alert fires, your team needs to know what to do. Document runbooks for common issues: “If pipeline failure rate exceeds 10%, check if the source system is down.”

Pitfall 5: Forgetting about retention

Cloud Logging retains logs for 30 days by default. If you need longer retention, export to BigQuery. But be aware of storage costs.

Conclusion: From Blind to Visible

Data pipelines are critical infrastructure. When they fail, your analytics fail. When they’re slow, your decisions are delayed. When they’re inefficient, your costs skyrocket.

Google Cloud Logging provides the foundation for observability across your entire data stack, but only if you implement it thoughtfully. This means instrumenting BigQuery, Dataflow, and Composer with structured logging. It means exporting logs to BigQuery so you can analyze them with SQL. It means building dashboards and alerts that actually tell you what’s happening.

The investment pays dividends. When you have visibility into your pipelines, you can optimize them. You can catch problems before they affect users. You can understand the true cost of your analytics infrastructure.

Start small: pick one critical pipeline, add structured logging, export logs to BigQuery, and build a dashboard. Once you see the value, expand to your entire data platform. Organizations implementing observability in GCP see dramatic improvements in reliability and cost efficiency.

Your data pipelines deserve to be as observable as your applications. Cloud Logging is the tool. The question is whether you’re ready to use it.