Skip to content

Dataform Workflows

Dataform is Google Cloud’s SQL-based data transformation tool, now integrated directly into BigQuery. It solves the main challenges with managing GA4 BigQuery queries: versioning, dependencies between derived tables, incremental processing, and automated scheduling.

Instead of running individual SQL queries manually or with cron jobs, Dataform manages a dependency graph of SQL transformations — ensuring tables are built in the right order, incrementally updated daily, and properly documented.

Use Dataform when:

  • You have multiple derived tables that depend on each other (events → sessions → daily_metrics → dashboard_data)
  • You need daily incremental updates without reprocessing the full history
  • You want version control for your SQL transformations
  • Multiple analysts are working on the same data models

Stick with standalone SQL when:

  • You are doing ad-hoc analysis
  • You have only one or two derived tables
  • You do not need incremental processing

Dataform is available in the BigQuery console under Dataform in the left menu, or through Cloud Workstations.

  1. Go to BigQuery → Dataform → Create repository.

  2. Choose a repository name and region (match your BigQuery dataset region).

  3. Connect to a git repository (GitHub, GitLab, or Cloud Source Repositories). This is where your Dataform project files live.

  4. Create a workspace (a working branch) to develop your transformations.

A Dataform project for GA4 data typically looks like:

dataform-project/
├── definitions/
│ ├── sources/
│ │ └── ga4_events.sqlx # Declare GA4 raw tables as source
│ ├── staging/
│ │ └── stg_ga4_events.sqlx # Clean, standardize raw events
│ ├── intermediate/
│ │ ├── int_sessions.sqlx # Session-level aggregation
│ │ └── int_events_flat.sqlx # Flattened event params
│ └── marts/
│ ├── fct_sessions.sqlx # Final sessions fact table
│ ├── fct_purchases.sqlx # Purchase transactions
│ └── dim_users.sqlx # User dimension
├── includes/
│ └── channel_classification.js # Shared JS function for channel logic
└── dataform.json # Project configuration

Declare the GA4 export tables as a Dataform source:

-- definitions/sources/ga4_events.sqlx
config {
type: "declaration",
database: "your-gcp-project",
schema: "analytics_PROPERTY_ID",
name: "events_*"
}

The most important pattern: an incremental sessions table that processes only new days:

-- definitions/intermediate/int_sessions.sqlx
config {
type: "incremental",
schema: "analytics_derived",
name: "sessions",
uniqueKey: ["user_pseudo_id", "session_id"],
partitionBy: {
field: "session_date",
dataType: "date"
},
clusterBy: ["source", "medium"],
bigquery: {
partitionExpirationDays: 730
},
description: "One row per GA4 session with traffic source, device, and conversion data.",
columns: {
user_pseudo_id: "Anonymous user identifier from GA4 (_ga cookie)",
session_id: "GA4 session ID (ga_session_id event parameter)",
session_date: "Date of session start",
source: "Traffic source of the session",
medium: "Traffic medium of the session",
channel: "Channel group (custom classification)"
}
}
-- Incremental predicate: only process new days
pre_operations {
DECLARE max_loaded_date DEFAULT (
${when(incremental(),
`SELECT MAX(session_date) FROM ${self()}`,
`DATE '2020-01-01'`
)}
);
}
SELECT
user_pseudo_id,
(SELECT value.int_value FROM UNNEST(event_params) WHERE key = 'ga_session_id') AS session_id,
PARSE_DATE('%Y%m%d', MIN(event_date)) AS session_date,
-- Traffic source (from session_start events only)
MAX(CASE WHEN event_name = 'session_start' THEN traffic_source.source END) AS source,
MAX(CASE WHEN event_name = 'session_start' THEN traffic_source.medium END) AS medium,
MAX(CASE WHEN event_name = 'session_start' THEN traffic_source.name END) AS campaign,
-- Channel classification (see includes section)
${includes.channel_classification.classifyChannel(
"MAX(CASE WHEN event_name = 'session_start' THEN traffic_source.source END)",
"MAX(CASE WHEN event_name = 'session_start' THEN traffic_source.medium END)"
)} AS channel,
MAX(device.category) AS device_category,
MAX(device.operating_system) AS operating_system,
MAX(geo.country) AS country,
COUNTIF(event_name = 'page_view') AS pageviews,
COUNTIF(event_name = 'purchase') AS purchases,
SUM(CASE WHEN event_name = 'purchase' THEN ecommerce.purchase_revenue ELSE 0 END) AS revenue,
MAX(CASE WHEN (SELECT value.string_value FROM UNNEST(event_params) WHERE key = 'session_engaged') = '1'
THEN 1 ELSE 0 END) AS is_engaged,
(SELECT value.int_value FROM UNNEST(event_params) WHERE key = 'ga_session_number') AS session_number
FROM ${ref("events_*")}
WHERE _TABLE_SUFFIX BETWEEN
FORMAT_DATE('%Y%m%d', max_loaded_date + 1) AND
FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY))
AND (SELECT value.int_value FROM UNNEST(event_params) WHERE key = 'ga_session_id') IS NOT NULL
GROUP BY user_pseudo_id, session_id

Put reusable logic in includes:

includes/channel_classification.js
function classifyChannel(sourceField, mediumField) {
return `
CASE
WHEN ${sourceField} IN ('(direct)', '') AND ${mediumField} IN ('(none)', '(not set)', '')
THEN 'Direct'
WHEN REGEXP_CONTAINS(LOWER(${mediumField}), r'email|e-mail')
THEN 'Email'
WHEN LOWER(${mediumField}) IN ('cpc', 'ppc', 'paid')
AND REGEXP_CONTAINS(LOWER(${sourceField}), r'google|bing|yahoo|baidu')
THEN 'Paid Search'
WHEN LOWER(${mediumField}) IN ('cpc', 'ppc', 'paid')
AND REGEXP_CONTAINS(LOWER(${sourceField}), r'facebook|instagram|twitter|linkedin|tiktok')
THEN 'Paid Social'
WHEN LOWER(${mediumField}) = 'organic'
AND REGEXP_CONTAINS(LOWER(${sourceField}), r'google|bing|yahoo|duckduckgo')
THEN 'Organic Search'
WHEN REGEXP_CONTAINS(LOWER(${sourceField}), r'facebook|instagram|twitter|linkedin|tiktok')
AND LOWER(${mediumField}) NOT IN ('cpc', 'ppc', 'paid')
THEN 'Organic Social'
WHEN LOWER(${mediumField}) = 'referral'
THEN 'Referral'
ELSE 'Unassigned'
END
`;
}
module.exports = { classifyChannel };

A daily metrics table that depends on the sessions table:

-- definitions/marts/fct_daily_metrics.sqlx
config {
type: "table",
schema: "analytics_marts",
name: "daily_metrics",
description: "Daily summary metrics for dashboards"
}
SELECT
session_date,
channel,
device_category,
country,
COUNT(*) AS sessions,
SUM(is_engaged) AS engaged_sessions,
COUNT(DISTINCT user_pseudo_id) AS users,
SUM(pageviews) AS pageviews,
SUM(purchases) AS transactions,
SUM(revenue) AS revenue,
ROUND(SUM(revenue) / NULLIF(SUM(purchases), 0), 2) AS avg_order_value,
ROUND(SUM(is_engaged) / COUNT(*) * 100, 1) AS engagement_rate_pct
FROM ${ref("sessions")} -- automatically depends on int_sessions
WHERE session_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)
GROUP BY session_date, channel, device_category, country

Dataform automatically detects the ${ref("sessions")} dependency and ensures sessions is built before fct_daily_metrics.

Dataform supports assertions (data quality tests) that run after each transformation:

-- definitions/tests/assert_sessions_no_null_date.sqlx
config {
type: "assertion",
description: "Verify no sessions have null session_date"
}
SELECT *
FROM ${ref("sessions")}
WHERE session_date IS NULL
-- definitions/tests/assert_sessions_positive_revenue.sqlx
config {
type: "assertion",
description: "Revenue should never be negative"
}
SELECT *
FROM ${ref("sessions")}
WHERE revenue < 0

If an assertion returns any rows, the Dataform run fails and downstream tables are not updated, preventing bad data from propagating.

Set up a release configuration and workflow schedule in Dataform:

  1. In the Dataform console, go to Release configurations → Create.

  2. Set the git ref (branch or tag) to run.

  3. Create a Workflow configuration → set schedule (e.g., daily at 08:00 UTC, after GA4 daily export completes).

  4. Select which tables to include (all, or specific tags).

  5. Save the schedule.

GA4 daily export typically completes between midnight and 8 AM in the property’s time zone. Schedule Dataform to run after this window to ensure new data is available.

Using a full refresh instead of incremental

Section titled “Using a full refresh instead of incremental”

A type: "table" (full refresh) recreates the entire table on each run — reprocessing years of raw events daily. For tables over a week of data, always use type: "incremental" with proper incremental predicates.

Incremental tables need to be bootstraed for the full history on first run. In the pre_operations block, use ${when(incremental(), ..., ...)} to handle both the initial load (full history) and subsequent incremental runs (new days only).

If table B depends on table A but you do not use ${ref("table_a")} in table B’s query, Dataform does not know about the dependency. Tables may build in the wrong order. Always reference upstream tables with ${ref()} rather than hardcoding the table name.