Skip to main content

AWS Glue setup

Community plugin

Some core functionality may be limited. If you're interested in contributing, check out the source code for each repository listed below.

  • Maintained by: Community
  • Authors: Benjamin Menuet, Moshir Mikael, Armando Segnini and Amine El Mallem
  • GitHub repo: aws-samples/dbt-glue
  • PyPI package: dbt-glue
  • Slack channel: #db-glue
  • Supported dbt Core version: v0.24.0 and newer
  • dbt Cloud support: Not Supported
  • Minimum data platform version: Glue 2.0

Installing dbt-glue

Use pip to install the adapter. Before 1.8, installing the adapter would automatically install dbt-core and any additional dependencies. Beginning in 1.8, installing an adapter does not automatically install dbt-core. This is because adapters and dbt Core versions have been decoupled from each other so we no longer want to overwrite existing dbt-core installations. Use the following command for installation:

Configuring dbt-glue

For AWS Glue-specific configuration, please refer to AWS Glue configs.

For further (and more likely up-to-date) info, see the README

Connection Methods

Configuring your AWS profile for Glue Interactive Session

There are two IAM principals used with interactive sessions.

  • Client principal: The principal (either user or role) calling the AWS APIs (Glue, Lake Formation, Interactive Sessions) from the local client. This is the principal configured in the AWS CLI and is likely the same.
  • Service role: The IAM role that AWS Glue uses to execute your session. This is the same as AWS Glue ETL.

Read this documentation to configure these principals.

You will find below a least privileged policy to enjoy all features of dbt-glue adapter.

Please to update variables between <>, here are explanations of these arguments:

ArgsDescription
regionThe region where your Glue database is stored
AWS AccountThe AWS account where you run your pipeline
dbt output databaseThe database updated by dbt (this is the schema configured in the profile.yml of your dbt environment)
dbt source databaseAll databases used as source
dbt output bucketThe bucket name where the data will be generated by dbt (the location configured in the profile.yml of your dbt environment)
dbt source bucketThe bucket name of source databases (if they are not managed by Lake Formation)
sample_IAM_Policy.yml
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Read_and_write_databases",
"Action": [
"glue:SearchTables",
"glue:BatchCreatePartition",
"glue:CreatePartitionIndex",
"glue:DeleteDatabase",
"glue:GetTableVersions",
"glue:GetPartitions",
"glue:DeleteTableVersion",
"glue:UpdateTable",
"glue:DeleteTable",
"glue:DeletePartitionIndex",
"glue:GetTableVersion",
"glue:UpdateColumnStatisticsForTable",
"glue:CreatePartition",
"glue:UpdateDatabase",
"glue:CreateTable",
"glue:GetTables",
"glue:GetDatabases",
"glue:GetTable",
"glue:GetDatabase",
"glue:GetPartition",
"glue:UpdateColumnStatisticsForPartition",
"glue:CreateDatabase",
"glue:BatchDeleteTableVersion",
"glue:BatchDeleteTable",
"glue:DeletePartition",
"glue:GetUserDefinedFunctions",
"lakeformation:ListResources",
"lakeformation:BatchGrantPermissions",
"lakeformation:ListPermissions",
"lakeformation:GetDataAccess",
"lakeformation:GrantPermissions",
"lakeformation:RevokePermissions",
"lakeformation:BatchRevokePermissions",
"lakeformation:AddLFTagsToResource",
"lakeformation:RemoveLFTagsFromResource",
"lakeformation:GetResourceLFTags",
"lakeformation:ListLFTags",
"lakeformation:GetLFTag",
],
"Resource": [
"arn:aws:glue:<region>:<AWS Account>:catalog",
"arn:aws:glue:<region>:<AWS Account>:table/<dbt output database>/*",
"arn:aws:glue:<region>:<AWS Account>:database/<dbt output database>"
],
"Effect": "Allow"
},
{
"Sid": "Read_only_databases",
"Action": [
"glue:SearchTables",
"glue:GetTableVersions",
"glue:GetPartitions",
"glue:GetTableVersion",
"glue:GetTables",
"glue:GetDatabases",
"glue:GetTable",
"glue:GetDatabase",
"glue:GetPartition",
"lakeformation:ListResources",
"lakeformation:ListPermissions"
],
"Resource": [
"arn:aws:glue:<region>:<AWS Account>:table/<dbt source database>/*",
"arn:aws:glue:<region>:<AWS Account>:database/<dbt source database>",
"arn:aws:glue:<region>:<AWS Account>:database/default",
"arn:aws:glue:<region>:<AWS Account>:database/global_temp"
],
"Effect": "Allow"
},
{
"Sid": "Storage_all_buckets",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<dbt output bucket>",
"arn:aws:s3:::<dbt source bucket>"
],
"Effect": "Allow"
},
{
"Sid": "Read_and_write_buckets",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<dbt output bucket>"
],
"Effect": "Allow"
},
{
"Sid": "Read_only_buckets",
"Action": [
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::<dbt source bucket>"
],
"Effect": "Allow"
}
]
}

Configuration of the local environment

Because dbt and dbt-glue adapters are compatible with Python versions 3.7, 3.8, and 3.9, check the version of Python:

$ python3 --version

Configure a Python virtual environment to isolate package version and code dependencies:

$ sudo yum install git
$ python3 -m venv dbt_venv
$ source dbt_venv/bin/activate
$ python3 -m pip install --upgrade pip

Configure the last version of AWS CLI

$ curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
$ unzip awscliv2.zip
$ sudo ./aws/install

Install boto3 package

$ sudo yum install gcc krb5-devel.x86_64 python3-devel.x86_64 -y
$ pip3 install —upgrade boto3

Install the package:

$ pip3 install dbt-glue

Example config

profiles.yml
type: glue
query-comment: This is a glue dbt example
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: us-east-1
workers: 2
worker_type: G.1X
idle_timeout: 10
schema: "dbt_demo"
session_provisioning_timeout_in_seconds: 120
location: "s3://dbt_demo_bucket/dbt_demo_data"

The table below describes all the options.

OptionDescriptionMandatory
project_nameThe dbt project name. This must be the same as the one configured in the dbt project.yes
typeThe driver to use.yes
query-commentA string to inject as a comment in each query that dbt runs.no
role_arnThe ARN of the glue interactive session IAM role.yes
regionThe AWS Region where you run the data pipeline.yes
workersThe number of workers of a defined workerType that are allocated when a job runs.yes
worker_typeThe type of predefined worker that is allocated when a job runs. Accepts a value of Standard, G.1X, or G.2X.yes
schemaThe schema used to organize data stored in Amazon S3.Additionally, is the database in AWS Lake Formation that stores metadata tables in the Data Catalog.yes
session_provisioning_timeout_in_secondsThe timeout in seconds for AWS Glue interactive session provisioning.yes
locationThe Amazon S3 location of your target data.yes
query_timeout_in_minutesThe timeout in minutes for a single query. Default is 300no
idle_timeoutThe AWS Glue session idle timeout in minutes. (The session stops after being idle for the specified amount of time)no
glue_versionThe version of AWS Glue for this session to use. Currently, the only valid options are 2.0 and 3.0. The default value is 3.0.no
security_configurationThe security configuration to use with this session.no
connectionsA comma-separated list of connections to use in the session.no
confSpecific configuration used at the startup of the Glue Interactive Session (arg --conf)no
extra_py_filesExtra python Libs that can be used by the interactive session.no
delta_athena_prefixA prefix used to create Athena-compatible tables for Delta tables (if not specified, then no Athena-compatible table will be created)no
tagsThe map of key-value pairs (tags) belonging to the session. Ex: KeyName1=Value1,KeyName2=Value2no
seed_formatBy default parquet, can be Spark format compatible like csv or jsonno
seed_modeBy default overwrite, the seed data will be overwritten, you can set it to append if you just want to add new data in your datasetno
default_argumentsThe map of key-value pairs parameters belonging to the session. More information on Job parameters used by AWS Glue. Ex: --enable-continuous-cloudwatch-log=true,--enable-continuous-log-filter=trueno
glue_session_idre-use the glue-session to run multiple dbt run commands: set a glue session id you need to useno
glue_session_reuseReuse the glue-session to run multiple dbt run commands: If set to true, the glue session will not be closed for re-use. If set to false, the session will be closedno
datalake_formatsThe ACID data lake format that you want to use if you are doing merge, can be hudi, ìceberg or deltano

Configs

Configuring tables

When materializing a model as table, you may include several optional configs that are specific to the dbt-spark plugin, in addition to the standard model configs.

OptionDescriptionRequired?Example
file_formatThe file format to use when creating tables (parquet, csv, json, text, jdbc or orc).Optionalparquet
partition_byPartition the created table by the specified columns. A directory is created for each partition.Optionaldate_day
clustered_byEach partition in the created table will be split into a fixed number of buckets by the specified columns.Optionalcountry_code
bucketsThe number of buckets to create while clusteringRequired if clustered_by is specified8
custom_locationBy default, the adapter will store your data in the following path: location path/schema/table. If you don't want to follow that default behaviour, you can use this parameter to set your own custom location on S3Nos3://mycustombucket/mycustompath
hudi_optionsWhen using file_format hudi, gives the ability to overwrite any of the default configuration options.Optional{'hoodie.schema.on.read.enable': 'true'}

Incremental models

dbt seeks to offer useful and intuitive modeling abstractions by means of its built-in configurations and materializations.

For that reason, the dbt-glue plugin leans heavily on the incremental_strategy config. This config tells the incremental materialization how to build models in runs beyond their first. It can be set to one of three values:

  • append (default): Insert new records without updating or overwriting any existing data.
  • insert_overwrite: If partition_by is specified, overwrite partitions in the table with new data. If no partition_by is specified, overwrite the entire table with new data.
  • merge (Apache Hudi and Apache Iceberg only): Match records based on a unique_key; update old records, and insert new ones. (If no unique_key is specified, all new data is inserted, similar to append.)

Each of these strategies has its pros and cons, which we'll discuss below. As with any model config, incremental_strategy may be specified in dbt_project.yml or within a model file's config() block.

Notes: The default strategy is insert_overwrite

The append strategy

Following the append strategy, dbt will perform an insert into statement with all new data. The appeal of this strategy is that it is straightforward and functional across all platforms, file types, connection methods, and Apache Spark versions. However, this strategy cannot update, overwrite, or delete existing data, so it is likely to insert duplicate records for many data sources.

Source code

{{ config(
materialized='incremental',
incremental_strategy='append',
) }}

-- All rows returned by this query will be appended to the existing table

select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}

Run Code

create temporary view spark_incremental__dbt_tmp as

select * from analytics.events

where event_ts >= (select max(event_ts) from {{ this }})

;

insert into table analytics.spark_incremental
select `date_day`, `users` from spark_incremental__dbt_tmp

The insert_overwrite strategy

This strategy is most effective when specified alongside a partition_by clause in your model config. dbt will run an atomic insert overwrite statement that dynamically replaces all partitions included in your query. Be sure to re-select all of the relevant data for a partition when using this incremental strategy.

If no partition_by is specified, then the insert_overwrite strategy will atomically replace all contents of the table, overriding all existing data with only the new records. The column schema of the table remains the same, however. This can be desirable in some limited circumstances since it minimizes downtime while the table contents are overwritten. The operation is comparable to running truncate + insert on other databases. For atomic replacement of Delta-formatted tables, use the table materialization (which runs create or replace) instead.

Source Code

{{ config(
materialized='incremental',
partition_by=['date_day'],
file_format='parquet'
) }}

/*
Every partition returned by this query will be overwritten
when this model runs
*/

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
date_day,
count(*) as users

from events
group by 1

Run Code

create temporary view spark_incremental__dbt_tmp as

with new_events as (

select * from analytics.events


where date_day >= date_add(current_date, -1)


)

select
date_day,
count(*) as users

from events
group by 1

;

insert overwrite table analytics.spark_incremental
partition (date_day)
select `date_day`, `users` from spark_incremental__dbt_tmp

Specifying insert_overwrite as the incremental strategy is optional since it's the default strategy used when none is specified.

The merge strategy

Compatibility:

  • Hudi : OK
  • Delta Lake : OK
  • Iceberg : OK
  • Lake Formation Governed Tables : On going

NB:

  • For Glue 3: you have to set up a Glue connectors.

  • For Glue 4: use the datalake_formats option in your profile.yml

When using a connector be sure that your IAM role has these policies:

{
"Sid": "access_to_connections",
"Action": [
"glue:GetConnection",
"glue:GetConnections"
],
"Resource": [
"arn:aws:glue:<region>:<AWS Account>:catalog",
"arn:aws:glue:<region>:<AWS Account>:connection/*"
],
"Effect": "Allow"
}

and that the managed policy AmazonEC2ContainerRegistryReadOnly is attached. Be sure that you follow the getting started instructions here.

This blog post also explains how to set up and works with Glue Connectors

Hudi

Usage notes: The merge with Hudi incremental strategy requires:

  • To add file_format: hudi in your table configuration
  • To add a datalake_formats in your profile : datalake_formats: hudi
    • Alternatively, to add a connection in your profile: connections: name_of_your_hudi_connector
  • To add Kryo serializer in your Interactive Session Config (in your profile): conf: spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false

dbt will run an atomic merge statement which looks nearly identical to the default merge behavior on Snowflake and BigQuery. If a unique_key is specified (recommended), dbt will update old records with values from new records that match the key column. If a unique_key is not specified, dbt will forgo match criteria and simply insert all new records (similar to append strategy).

Profile config example

test_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
conf: spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
datalake_formats: hudi

Source Code example

{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='user_id',
file_format='hudi',
hudi_options={
'hoodie.datasource.write.precombine.field': 'eventtime',
}
) }}

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
user_id,
max(date_day) as last_seen

from events
group by 1

Delta

You can also use Delta Lake to be able to use merge feature on tables.

Usage notes: The merge with Delta incremental strategy requires:

  • To add file_format: delta in your table configuration
  • To add a datalake_formats in your profile : datalake_formats: delta
    • Alternatively, to add a connection in your profile: connections: name_of_your_delta_connector
  • To add the following config in your Interactive Session Config (in your profile): conf: "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

Athena: Athena is not compatible by default with delta tables, but you can configure the adapter to create Athena tables on top of your delta table. To do so, you need to configure the two following options in your profile:

  • For Delta Lake 2.1.0 supported natively in Glue 4.0: extra_py_files: "/opt/aws_glue_connectors/selected/datalake/delta-core_2.12-2.1.0.jar"
  • For Delta Lake 1.0.0 supported natively in Glue 3.0: extra_py_files: "/opt/aws_glue_connectors/selected/datalake/delta-core_2.12-1.0.0.jar"
  • delta_athena_prefix: "the_prefix_of_your_choice"
  • If your table is partitioned, then the addition of new partition is not automatic, you need to perform an MSCK REPAIR TABLE your_delta_table after each new partition adding

Profile config example

test_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
datalake_formats: delta
conf: "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
extra_py_files: "/opt/aws_glue_connectors/selected/datalake/delta-core_2.12-2.1.0.jar"
delta_athena_prefix: "delta"

Source Code example

{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='user_id',
partition_by=['dt'],
file_format='delta'
) }}

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
user_id,
max(date_day) as last_seen,
current_date() as dt

from events
group by 1

Iceberg

Usage notes: The merge with Iceberg incremental strategy requires:

  • To attach the AmazonEC2ContainerRegistryReadOnly Manged policy to your execution role :
  • To add the following policy to your execution role to enable commit locking in a dynamodb table (more info here). Note that the DynamoDB table specified in the resource field of this policy should be the one that is mentioned in your dbt profiles (--conf spark.sql.catalog.glue_catalog.lock.table=myGlueLockTable). By default, this table is named myGlueLockTable and is created automatically (with On-Demand Pricing) when running a dbt-glue model with Incremental Materialization and Iceberg file format. If you want to name the table differently or to create your own table without letting Glue do it on your behalf, please provide the iceberg_glue_commit_lock_table parameter with your table name (eg. MyDynamoDbTable) in your dbt profile.
iceberg_glue_commit_lock_table: "MyDynamoDbTable"
  • the latest connector for iceberg in AWS marketplace uses Ver 0.14.0 for Glue 3.0, and Ver 1.2.1 for Glue 4.0 where Kryo serialization fails when writing iceberg, use "org.apache.spark.serializer.JavaSerializer" for spark.serializer instead, more info here

Make sure you update your conf with --conf spark.sql.catalog.glue_catalog.lock.table=<YourDynamoDBLockTableName> and, you change the below iam permission with your correct table name.

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "CommitLockTable",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:BatchGetItem",
"dynamodb:BatchWriteItem",
"dynamodb:ConditionCheckItem",
"dynamodb:PutItem",
"dynamodb:DescribeTable",
"dynamodb:DeleteItem",
"dynamodb:GetItem",
"dynamodb:Scan",
"dynamodb:Query",
"dynamodb:UpdateItem"
],
"Resource": "arn:aws:dynamodb:<AWS_REGION>:<AWS_ACCOUNT_ID>:table/myGlueLockTable"
}
]
}
  • To add file_format: Iceberg in your table configuration
  • To add a datalake_formats in your profile : datalake_formats: iceberg
    • Alternatively, to add connections in your profile: connections: name_of_your_iceberg_connector (
      • For Athena version 3:
        • The adapter is compatible with the Iceberg Connector from AWS Marketplace with Glue 3.0 as Fulfillment option and 0.14.0 (Oct 11, 2022) as Software version)
        • the latest connector for iceberg in AWS marketplace uses Ver 0.14.0 for Glue 3.0, and Ver 1.2.1 for Glue 4.0 where Kryo serialization fails when writing iceberg, use "org.apache.spark.serializer.JavaSerializer" for spark.serializer instead, more info here
      • For Athena version 2: The adapter is compatible with the Iceberg Connector from AWS Marketplace with Glue 3.0 as Fulfillment option and 0.12.0-2 (Feb 14, 2022) as Software version)
  • To add the following config in your Interactive Session Config (in your profile):
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.sql.warehouse=s3://<your-bucket-name>
--conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
--conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
--conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.dynamodb.DynamoDbLockManager
--conf spark.sql.catalog.glue_catalog.lock.table=myGlueLockTable
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
  • For Glue 3.0, set spark.sql.catalog.glue_catalog.lock-impl to org.apache.iceberg.aws.glue.DynamoLockManager instead

dbt will run an atomic merge statement which looks nearly identical to the default merge behavior on Snowflake and BigQuery. You need to provide a unique_key to perform merge operation otherwise it will fail. This key is to provide in a Python list format and can contains multiple column name to create a composite unique_key.

Notes
  • When using a custom_location in Iceberg, avoid to use final trailing slash. Adding a final trailing slash lead to an un-proper handling of the location, and issues when reading the data from query engines like Trino. The issue should be fixed for Iceberg version > 0.13. Related Github issue can be find here.
  • Iceberg also supports insert_overwrite and append strategies.
  • The warehouse conf must be provided, but it's overwritten by the adapter location in your profile or custom_location in model configuration.
  • By default, this materialization has iceberg_expire_snapshots set to 'True', if you need to have historical auditable changes, set: iceberg_expire_snapshots='False'.
  • Currently, due to some dbt internal, the iceberg catalog used internally when running glue interactive sessions with dbt-glue has a hardcoded name glue_catalog. This name is an alias pointing to the AWS Glue Catalog but is specific to each session. If you want to interact with your data in another session without using dbt-glue (from a Glue Studio notebook, for example), you can configure another alias (ie. another name for the Iceberg Catalog). To illustrate this concept, you can set in your configuration file :
--conf spark.sql.catalog.RandomCatalogName=org.apache.iceberg.spark.SparkCatalog

And then run in an AWS Glue Studio Notebook a session with the following config:

--conf spark.sql.catalog.AnotherRandomCatalogName=org.apache.iceberg.spark.SparkCatalog

In both cases, the underlying catalog would be the AWS Glue Catalog, unique in your AWS Account and Region, and you would be able to work with the exact same data. Also make sure that if you change the name of the Glue Catalog Alias, you change it in all the other --conf where it's used:

 --conf spark.sql.catalog.RandomCatalogName=org.apache.iceberg.spark.SparkCatalog 
--conf spark.sql.catalog.RandomCatalogName.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
...
--conf spark.sql.catalog.RandomCatalogName.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager
  • A full reference to table_properties can be found here.
  • Iceberg Tables are natively supported by Athena. Therefore, you can query tables created and operated with dbt-glue adapter from Athena.
  • Incremental Materialization with Iceberg file format supports dbt snapshot. You are able to run a dbt snapshot command that queries an Iceberg Table and create a dbt fashioned snapshot of it.

Profile config example

test_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
datalake_formats: iceberg
conf: --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.warehouse=s3://aws-dbt-glue-datalake-1234567890-eu-west-1/dbt_test_project --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.dynamodb.DynamoDbLockManager --conf spark.sql.catalog.glue_catalog.lock.table=myGlueLockTable --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

Source Code example

{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=['user_id'],
file_format='iceberg',
iceberg_expire_snapshots='False',
partition_by=['status']
table_properties={'write.target-file-size-bytes': '268435456'}
) }}

with new_events as (

select * from {{ ref('events') }}

{% if is_incremental() %}
where date_day >= date_add(current_date, -1)
{% endif %}

)

select
user_id,
max(date_day) as last_seen

from events
group by 1

Iceberg Snapshot source code example


{% snapshot demosnapshot %}

{{
config(
strategy='timestamp',
target_schema='jaffle_db',
updated_at='dt',
file_format='iceberg'
) }}

select * from {{ ref('customers') }}

{% endsnapshot %}

Monitoring your Glue Interactive Session

Monitoring is an important part of maintaining the reliability, availability, and performance of AWS Glue and your other AWS solutions. AWS provides monitoring tools that you can use to watch AWS Glue, identify the required number of workers required for your Glue Interactive Session, report when something is wrong and take action automatically when appropriate. AWS Glue provides Spark UI, and CloudWatch logs and metrics for monitoring your AWS Glue jobs. More information on: Monitoring AWS Glue Spark jobs

Usage notes: Monitoring requires:

  • To add the following IAM policy to your IAM role:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "CloudwatchMetrics",
"Effect": "Allow",
"Action": "cloudwatch:PutMetricData",
"Resource": "*",
"Condition": {
"StringEquals": {
"cloudwatch:namespace": "Glue"
}
}
},
{
"Sid": "CloudwatchLogs",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:*:*:/aws-glue/*",
"arn:aws:s3:::bucket-to-write-sparkui-logs/*"
]
}
]
}

Profile config example

test_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "4.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
default_arguments: "--enable-metrics=true, --enable-continuous-cloudwatch-log=true, --enable-continuous-log-filter=true, --enable-spark-ui=true, --spark-event-logs-path=s3://bucket-to-write-sparkui-logs/dbt/"

If you want to use the Spark UI, you can launch the Spark history server using a AWS CloudFormation template that hosts the server on an EC2 instance, or launch locally using Docker. More information on Launching the Spark history server

Enabling AWS Glue Auto Scaling

Auto Scaling is available since AWS Glue version 3.0 or later. More information on the following AWS blog post: "Introducing AWS Glue Auto Scaling: Automatically resize serverless computing resources for lower cost with optimized Apache Spark"

With Auto Scaling enabled, you will get the following benefits:

  • AWS Glue automatically adds and removes workers from the cluster depending on the parallelism at each stage or microbatch of the job run.

  • It removes the need for you to experiment and decide on the number of workers to assign for your AWS Glue Interactive sessions.

  • Once you choose the maximum number of workers, AWS Glue will choose the right size resources for the workload.

  • You can see how the size of the cluster changes during the Glue Interactive sessions run by looking at CloudWatch metrics. More information on Monitoring your Glue Interactive Session.

Usage notes: AWS Glue Auto Scaling requires:

  • To set your AWS Glue version 3.0 or later.
  • To set the maximum number of workers (if Auto Scaling is enabled, the workers parameter sets the maximum number of workers)
  • To set the --enable-auto-scaling=true parameter on your Glue Interactive Session Config (in your profile). More information on Job parameters used by AWS Glue

Profile config example

test_project:
target: dev
outputs:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "3.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
default_arguments: "--enable-auto-scaling=true"

Access Glue catalog in another AWS account

In many cases, you may need to run you dbt jobs to read from another AWS account.

Review the following link https://repost.aws/knowledge-center/glue-tables-cross-accounts to set up access policies in source and target accounts

Add the following "spark.hadoop.hive.metastore.glue.catalogid=<AWS-ACCOUNT-ID>" to your conf in the DBT profile, as such, you can have multiple outputs for each of the accounts that you have access to.

Note: The access cross-accounts need to be within the same AWS Region

Profile config example

test_project:
target: dev
outputsAccountB:
dev:
type: glue
query-comment: my comment
role_arn: arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
region: eu-west-1
glue_version: "3.0"
workers: 2
worker_type: G.1X
schema: "dbt_test_project"
session_provisioning_timeout_in_seconds: 120
location: "s3://aws-dbt-glue-datalake-1234567890-eu-west-1/"
conf: "--conf hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
--conf spark.hadoop.hive.metastore.glue.catalogid=<TARGET-AWS-ACCOUNT-ID-B>"

Persisting model descriptions

Relation-level docs persistence is supported since dbt v0.17.0. For more information on configuring docs persistence, see the docs.

When the persist_docs option is configured appropriately, you'll be able to see model descriptions in the Comment field of describe [table] extended or show table extended in [database] like '*'.

Always schema, never database

Apache Spark uses the terms "schema" and "database" interchangeably. dbt understands database to exist at a higher level than schema. As such, you should never use or set database as a node config or in the target profile when running dbt-glue.

If you want to control the schema/database in which dbt will materialize models, use the schema config and generate_schema_name macro only. For more information, check the dbt documentation about custom schemas.

AWS Lakeformation integration

The adapter supports AWS Lake Formation tags management enabling you to associate existing tags defined out of dbt-glue to database objects built by dbt-glue (database, table, view, snapshot, incremental models, seeds).

  • You can enable or disable lf-tags management via config, at model and dbt-project level (disabled by default)
  • If enabled, lf-tags will be updated on every dbt run. There are table level lf-tags configs and column-level lf-tags configs.
  • You can specify that you want to drop existing database, table column Lake Formation tags by setting the drop_existing config field to True (False by default, meaning existing tags are kept)
  • Please note that if the tag you want to associate with the table does not exist, the dbt-glue execution will throw an error

The adapter also supports AWS Lakeformation data cell filtering.

  • You can enable or disable data-cell filtering via config, at model and dbt-project level (disabled by default)
  • If enabled, data_cell_filters will be updated on every dbt run.
  • You can specify that you want to drop existing table data-cell filters by setting the drop_existing config field to True (False by default, meaning existing filters are kept)
  • You can leverage excluded_columns_names OR columns config fields to perform Column level security as well. Please note that you can use one or the other but not both.
  • By default, if you don't specify any column or excluded_columns, dbt-glue does not perform Column level filtering and let the principal access all the columns.

The below configuration let the specified principal (lf-data-scientist IAM user) access rows that have a customer_lifetime_value > 15 and all the columns specified ('customer_id', 'first_order', 'most_recent_order', 'number_of_orders')

lf_grants={
'data_cell_filters': {
'enabled': True,
'drop_existing' : True,
'filters': {
'the_name_of_my_filter': {
'row_filter': 'customer_lifetime_value>15',
'principals': ['arn:aws:iam::123456789:user/lf-data-scientist'],
'column_names': ['customer_id', 'first_order', 'most_recent_order', 'number_of_orders']
}
},
}
}

The below configuration let the specified principal (lf-data-scientist IAM user) access rows that have a customer_lifetime_value > 15 and all the columns except the one specified ('first_name')

lf_grants={
'data_cell_filters': {
'enabled': True,
'drop_existing' : True,
'filters': {
'the_name_of_my_filter': {
'row_filter': 'customer_lifetime_value>15',
'principals': ['arn:aws:iam::123456789:user/lf-data-scientist'],
'excluded_column_names': ['first_name']
}
},
}
}

See below some examples of how you can integrate LF Tags management and data cell filtering to your configurations :

At model level

This way of defining your Lakeformation rules is appropriate if you want to handle the tagging and filtering policy at object level. Remember that it overrides any configuration defined at dbt-project level.

{{ config(
materialized='incremental',
unique_key="customer_id",
incremental_strategy='append',
lf_tags_config={
'enabled': true,
'drop_existing' : False,
'tags_database':
{
'name_of_my_db_tag': 'value_of_my_db_tag'
},
'tags_table':
{
'name_of_my_table_tag': 'value_of_my_table_tag'
},
'tags_columns': {
'name_of_my_lf_tag': {
'value_of_my_tag': ['customer_id', 'customer_lifetime_value', 'dt']
}}},
lf_grants={
'data_cell_filters': {
'enabled': True,
'drop_existing' : True,
'filters': {
'the_name_of_my_filter': {
'row_filter': 'customer_lifetime_value>15',
'principals': ['arn:aws:iam::123456789:user/lf-data-scientist'],
'excluded_column_names': ['first_name']
}
},
}
}
) }}

select
customers.customer_id,
customers.first_name,
customers.last_name,
customer_orders.first_order,
customer_orders.most_recent_order,
customer_orders.number_of_orders,
customer_payments.total_amount as customer_lifetime_value,
current_date() as dt

from customers

left join customer_orders using (customer_id)

left join customer_payments using (customer_id)

At dbt-project level

This way you can specify tags and data filtering policy for a particular path in your dbt project (eg. models, seeds, models/model_group1, etc.) This is especially useful for seeds, for which you can't define configuration in the file directly.

seeds:
+lf_tags_config:
enabled: true
tags_table:
name_of_my_table_tag: 'value_of_my_table_tag'
tags_database:
name_of_my_database_tag: 'value_of_my_database_tag'
models:
+lf_tags_config:
enabled: true
drop_existing: True
tags_database:
name_of_my_database_tag: 'value_of_my_database_tag'
tags_table:
name_of_my_table_tag: 'value_of_my_table_tag'

Tests

To perform a functional test:

  1. Install dev requirements:
$ pip3 install -r dev-requirements.txt
  1. Install dev locally
$ python3 setup.py build && python3 setup.py install_lib
  1. Export variables
$ export DBT_S3_LOCATION=s3://mybucket/myprefix
$ export DBT_ROLE_ARN=arn:aws:iam::1234567890:role/GlueInteractiveSessionRole
  1. Run the test
$ python3 -m pytest tests/functional

For more information, check the dbt documentation about testing a new adapter.

Caveats

Supported Functionality

Most dbt Core functionality is supported, but some features are only available with Apache Hudi.

Apache Hudi-only features:

  1. Incremental model updates by unique_key instead of partition_by (see merge strategy)

Some dbt features, available on the core adapters, are not yet supported on Glue:

  1. Persisting column-level descriptions as database comments
  2. Snapshots
0