Databricks as a Target
  • 11 Minutes to read
  • Dark
    Light
  • PDF

Databricks as a Target

  • Dark
    Light
  • PDF

Article summary

Introduction

Databricks is a cloud-native SQL analytics engine that provides high-performance, scalable analytics for data lakes, data warehouses, and other big data use cases.
In this document, we will discuss how to use Databricks SQL as a target in Rivery.

Prerequisites

  1. A valid Databricks Account or Workspace.
  2. A Databricks connection in Rivery.

Unity Catalog

Databricks Unity Catalog is a feature of the Databricks Unified Data Analytics Platform that allows organizations to easily manage, discover, and collaborate on their data assets. By offering consolidated access control, tracking, lineage, and data exploration functions throughout Databricks workspaces, Unity Catalog simplifies the management of data assets.
In this section, we will cover how to work with DataBricks Unity Catalog.

Catalog Prerequisite

Create Your First Metastore and Attach a Workspace

In order to use the Unity Catalog, it is necessary to first create a metastore. A metastore is a centralized repository where metadata is stored and managed. This metadata includes information such as data types, file formats, and other characteristics that help users locate and use the data they need.

After creating the metastore, the next step is to link it to a workspace. Connecting the metastore to a workspace enables you to conveniently manage and access your data using the Unity catalog.

To create a Metastore and link it to a Workspace, follow the instructions below:

  1. Access the Databricks account console and log in.

  2. Navigate to the Catalog icon and click on it.

    image.png

  3. Select the option Create metastore.

image.png

  1. Provide a name for the Metastore and specify the Region where it will be deployed. It is recommended to place the workspaces, metastore, and cloud storage location in the same cloud region for optimal performance.

  2. Enter the S3 bucket path for the storage bucket and IAM role name.
    Note that you can exclude "s3://" when entering the path.

  3. Click the Create button to complete the process.

    image.png

  4. Once prompted, select the workspaces you want to associate with the metastore and click Assign.

    image.png

Enable the Unity Catalog Toggle in Your SQL Warehouse

  1. Log in to your Databricks workspace.
  2. Locate the button labeled SQL Warehouse and click it.

image.png

  1. Identify the exact warehouse you are working with and proceed to click on the 3 dots icon, followed by selecting the Edit option.

image.png

  1. In the Advanced options section, enable the Unity Catalog toggle.

image.png

How to Work With Unity Catalog

Please Note:
  • Your access to catalogs within the current Metastore is limited to either all catalogs if you are a Metastore admin, or only the catalogs where you have ownership or USAGE privilege.
  • In case the Catalog name is not specified, Rivery will automatically load into the default catalog of the Hive_Metastore.
  1. Ensure that all prerequisites have been met.
  2. Make sure you have a valid Databricks connection in Rivery.
  3. Input the name of the Catalog.

image.png


Schemas and Tables

Databricks is a data warehousing platform that relies on a hierarchy of schemas and tables. The conventional approach to working with Databricks involves creating tables beforehand and then populating them with data. This process can be time-consuming and may require advanced technical skills.

However, Rivery eliminates the need for this upfront table creation process. Rivery will automatically create the necessary tables if they don't already exist and populate them with the extracted data. Additionally, Rivery handles all the required changes and updates to your tables automatically.
This simplifies your data integration process and help you to focus on extracting insights from your data.

Create a New Schema

It is advisable to set up a dedicated Schema for Rivery on Databricks.
In order to do so, proceed with the following steps:

  1. Go to the Databricks workspace console, click on Queries, and then select Create Query.
    image.png

  2. In the screen designated for query, execute the following SQL command by pasting and running it.

CREATE SCHEMA IF NOT EXISTS `rivery`
COMMENT 'Rivery Tables Schema';

Loading Modes

Rivery provides 3 types of loading modes in Databricks as a Target.

1. Overwrite

2. Append-Only

3. Upsert-Merge

Rivery allows for 2 distinct options when the Upsert-Merge loading mode option is selected in Databricks.

The Upsert-Merge option is used to update existing data in a table that may have changed in the source since the last run, as well as insert new records. While all three Upsert-Merge options result in this effect, they differ in the backend process, and have performance implications.

image.png

Please Note:

When performing an upsert-merge using a non-unique primary key, the data is appended to the Target instead of removing duplicate records.

Switch-Merge

The Switch Merge method in Databricks is a way of merging data from two or more tables or streams into a single table or stream. It works by creating a new table or stream that contains the data from the other tables, and then switching the references to the new table. This can be useful for a number of purposes, such as:

Updating data in a table: By using the switch merge method, you can easily update the data in a table by replacing it with new data from another table.

Consolidating data from multiple tables: If you have data spread across multiple tables, the Switch Merge method can be used to consolidate it into a single table.

Performing "Upsert" operations: The Switch Merge method can be used to perform "upsert" operations, which insert new rows into a table or stream if they don't already exist, or update existing rows if they do.

Switch-Merge Method Flowchart

Untitled 24.png

The Steps

  1. Select target table and merge table: The target table, which will be modified by the merge, and the merge table, which contains the data to be merged, are selected.
  2. Create temporary table: A temporary table is created to hold the merged data.
  3. Insert merged data into temporary table: The data from the merge table is inserted into the temporary table.
  4. Swap target table and temporary table: The original target table is swapped with the temporary table containing the merged data.
  5. Check for conflicts in merged data: Any conflicts or errors in the merged data are identified.
  6. Resolve conflicts in merged data: Any conflicts or errors are resolved.
  7. Drop temporary table: The temporary table is no longer needed, so it is dropped.
  8. Finish: The process is completed and the merged table is now the current table.

Example
The MERGE statement allows you to update or insert data into a table, depending on whether the data already exists. The SWITCH statement can be used to control the order in which the WHEN MATCHED and WHEN NOT MATCHED clauses are executed within the MERGE statement.

Here is an example of how you could use the SWITCH and MERGE statements together:

CREATE Table <tmp_target_table> 
COPY GRANTS
AS
SELECT col1, col2, col3, col4
FROM (
  SELECT trgt.col1 as col1, 
        null as col2 /*In a case the field was dropped from the mapping */, 
        trgt.col3 as col3, 
        trgt.col4 as col4
  FROM <trgt_table> as trgt
  WHERE NOT EXISTS (
    select 1 from <tmp_src_table> src_ where hash(trgt.key) = src_.gk /* gk created in advanced */
  )
  UNION ALL
  SELECT src.col1 as col1, 
        src.col2 as col2, 
        null as col3 /*In a case the field doesnt exist in the target */, 
        src.col4 as col4
  FROM <tmp_src_table> src
);

In this example, the SWITCH statement is used to select between two blocks of MERGE statements. The first MERGE statement updates rows with even id values, and the second MERGE statement, which is executed when the SWITCH condition is met, updates rows with odd id values.

Merge

Based on whether the row already exists or not, the MERGE command lets you update or add rows to tables. It can be used to insert data into a table or update an existing one.

Merge Method Flowchart
Untitled 26.png

The Steps

  1. Identify the Target Table: This is the table that the data from the source table will be merged into.
  2. Find the source table: This is the table that contains the data that will be merged into the target table.
  3. Determine the merge condition: This is the condition that specifies which rows from the source table should be merged into the target table.
  4. Execute the merge: This step performs the actual merge operation, using the target table, source table, and merge condition specified in the previous steps. The merge operation will insert any rows from the source table that do not exist in the target table and update any rows in the target table that match the merge condition with the corresponding rows from the source table.

Example
Here is an example of how to use the Merge method:

MERGE into WAREHOUSE.SCHEMA.RIVER_EXECUTIONS_TM trgt_                    
    USING WAREHOUSE.SCHEMA.tmp_RIVER_EXECUTIONS_TM_8665e1 src_                    
    ON trgt_.run_id = src_.run_id                    
    WHEN MATCHED THEN                      
        update set trgt_.river_id = src_.river_id,
            trgt_.task_id = src_.task_id,
            trgt_.run_id = src_.run_id,
            trgt_.status = src_.status,
            trgt_.current_files = src_.current_files,
            trgt_.total_size = src_.total_size,
            trgt_.total_files = src_.total_files,
            trgt_.row_update_date = src_.row_update_date,
            trgt_.run_date = src_.run_date,
            trgt_.run_end_date = src_.run_end_date,
            trgt_.account = src_.account,
            trgt_.error_description = src_.error_description,
            trgt_.task_ordinal = src_.task_ordinal,
            trgt_.last_task_activity_id = src_.last_task_activity_id,
            trgt_.datasource_type = src_.datasource_type,
            trgt_.is_hidden = src_.is_hidden,
            trgt_.logic_steps = src_.logic_steps,
            trgt_.units = src_.units,
            trgt_.is_cancel = src_.is_cancel,
            trgt_.target_name = src_.target_name,
            trgt_.source_name = src_.source_name,
            trgt_.scheduler_id = src_.scheduler_id,
            trgt_.is_multi = src_.is_multi,
            trgt_.total_rows = src_.total_rows,
            trgt_.total_tables = src_.total_tables,
            trgt_.env_id = src_.env_id                    
    WHEN NOT MATCHED THEN                      
        INSERT (
            river_id,
            task_id,
            run_id,
            status,
            current_files,
            total_size,
            total_files,
            row_update_date,
            run_date,
            run_end_date,
            account,
            error_description,
            task_ordinal,
            last_task_activity_id,
            datasource_type,
            is_hidden,logic_steps,
            units,is_cancel,
            target_name,
            source_name,
            scheduler_id,
            is_multi,
            total_rows,
            total_tables,
            env_id
            ) 
        values (
            src_.river_id,
            src_.task_id,
            src_.run_id,
            src_.status,
            src_.current_files,
            src_.total_size,
            src_.total_files,
            src_.row_update_date,
            src_.run_date,
            src_.run_end_date,
            src_.account,
            src_.error_description,
            src_.task_ordinal,
            src_.last_task_activity_id,
            src_.datasource_type,
            src_.is_hidden,
            src_.logic_steps,
            src_.units,
            src_.is_cancel,
            src_.target_name,
            src_.source_name,
            src_.scheduler_id,
            src_.is_multi,
            src_.total_rows,
            src_.total_tables,
            src_.env_id
            )

In this example, the MERGE statement updates the column2 value in the target_table for rows that have a matching primary_key value in the source_table. For rows in the source_table that do not have a matching primary_key value in the target_table, the MERGE statement inserts these rows into the target_table.

Store in a Custom Location

Rivery provides an option for storing external tables in DBFS (Databricks File System) or external location.

DBFS

DBFS (Databricks File System) is a distributed file system that is optimized for working with workloads, and provides a scalable and reliable way to store and manage data. With DBFS, users can store data in a variety of file formats, including Parquet, CSV, JSON, and more, and access it directly using SparkSQL commands.

External tables are a powerful feature in Databricks that enable users to work with data stored in a variety of locations and file formats. By providing support for DBFS, it makes it easy to manage and manipulate large datasets, while providing the flexibility and scalability that data workloads require.
To specify a DBFS prefix, users can check the "Store in a Custom Location" box and define the DBFS parameter.
The location will be set automatically under the abfss://<some_path>/<some_sub_path> path.

image.png

External Location

In addition to DBFS, Rivery also provides an option for storing in external tables. External tables are a useful feature in Databricks that allow users to create and manage tables. The external tables are stored in external locations, outside of Databricks' file system, and can be accessed and manipulated directly. This provides greater flexibility and scalability, as it allows users to work with data stored in a variety of file formats and external sources.

Creating an external table is straightforward. Users can specify an external location prefix when creating a Delta table, by checking the "Store in a Custom Location" box and setting the External Location parameter.
The location will be set automatically under the <storage>://<bucket_name>/<some_path> path.
This makes it easy to work with data stored in external locations, such as Amazon S3 or Azure Blob Storage, without having to move it into Databricks' file system.

image.png

How Rivery Manages Data Loads and Table Metadata?

The first step in the loading process is creating the target Schema if it doesn't already exist. Rivery checks and ensures that the Schema is present in the Databricks workspace. If the Schema is not found, Rivery creates the Schema for the user.

Please Note:
The personal access token provided to the user should have sufficient permissions to create new Schemas in the system; otherwise, the source to target process may fail.
The user should contact their Databricks account manager or admin to provide them with a token that has CREATE SCHEMA permissions in Databricks.
If no Schema is provided in the process, the default Schema name is "default".

The next step is staging table loading, which involves creating a table with the "tmp_" prefix and the correct metadata. Rivery loads the data to the "tmp_" table using the COPY command. If the source data format is CSV, the "tmp_" table is created using the fields in the mapping, which are typed-converted. If the source data format is JSON, a one-columned table with string data type is created.
Rivery then flattens the data using JSON Extract Functions (if the source data format is JSON) and casts it with cast([column] as [dataType]) as [alias] syntax.
Rivery also auto-scales integer columns to the right integer data type based on their data length (SMALLINT->INTEGER->BIGINT->NUMERIC). After that, Rivery drops the old "tmp_" table with the json_data field (if it exists).

In the last stage, Rivery loads data from the staging table to the target table. All the following steps are performed in one transaction based on Postgres' ability to run DDL and DML inside one transaction:
Rivery defines and updates any metadata changes on the target table, excluding dropping columns.
If the target table does not exist, Rivery creates it using the "CREATE AS SELECT" clause.
If the append loading mode is selected, Rivery clones the target table and adds/drops/alters columns in the clone table. Rivery then loads the data to the target table using the loading mode.
Finally, Rivery uses DEEP CLONE to clone the "tmp_" table into the target table and drops the "tmp_" tables in the process.


Was this article helpful?