- 6 Minutes to read
-
Print
-
DarkLight
-
PDF
Databricks as a Target
- 6 Minutes to read
-
Print
-
DarkLight
-
PDF
Overview
Databricks is a managed service of Databricks for processing and transforming data upon datalake. Databricks is based on Databricks' Delta Lake, an open source solution for building, managing and processing data using Lakehouse architecture.
This article will discuss on how to configure and what are the options on Databricks as a target in a Data Source To Target River.
Requirements
- A valid Databricks account or workspace, please create one here.
- A valid Databricks workspace. If you don't have any Databricks workspace in your Databricks account, please check out the next article.
- A valid Databricks connection. In order to configure one, please check out Setting Up a Connection for Databricks.
Databricks as a target configurations
Databases and tables
Databricks is a DWH based on databases.tables hierarchy. However, using Rivery, there's no need to create any table in advanced and then populate it. Rivery manages, creates (if not exists), populate and make the needed changes for your tables automatically.
Create new database
It is recommended to create new database on Databricks that will dedicated to Rivery. In order to make it:
- On Databricks console, go to Queries -> +New Query
- On the query screen, paste and run the next SQL command:
CREATE DATABASE IF NOT EXISTS `rivery`
COMMENT 'Rivery Tables Database';
Managed Tables
Databricks provides and recommend the ability to use managed tables by Databricks engine. Rivery lays mostly on this ability, and manages, push and define table's data and metadata upon managed tables, under a managed database. Rivery creates and uses managed tables as a default.
External Tables
Databricks also provides the ability to create, manage and populate external tables (as the "old" fashioned tables in SparkSQL). You may use an external location prefix for creating a Delta table under external location. In order to do that, you can check the Use External Location box and set the externalLocation
as you like.
The location will be set as under the /{externalLocation}/{schema}/{table}
path automatically.
For further reading about external delta tables, click here
Loading Modes
Rivery provides 3 types of loading mode in Databricks as a target.
Overwrite
This mode will create a whole new table in Databricks database.
Append
This will use INSERT INTO the Target Table (matching on columns) and append any new records, keeping all existing records. If any change of metadata preformed during the loading, Rivery will CLONE the target table, preform any metadata changes above the new cloned table, and then clone the table back. This method provides maximum durability and avoid locks on currently used tables.
Upsert-Merge
This loading mode performs a key-based merge of the new data and the data in the target table. Rivery employs the Switch-Merge and Merge methods. Rivery converts all Key columns to strings while mapping in order to support multiple Key types. The final data is unaffected by this, but it could lead to edge cases where two Key values map to the same location.
Examples include array('hello', 'wor', 'ld') and array('hello', 'wor, ld'), which are both mapped to '[hello, wor, ld]'.
Review the Databricks string conversion documentation for more edge cases.
Switch-Merge
This method of updating rows in a target table is the default and typically the quickest. The backend process separates the "new data" by searching the old table for rows with primary Keys that match the new data coming in from the source, and then creates a temporary table containing the "new data" and the "old data" (rows from the target table that don't match the rows with the updated primary Keys). In order to create a table in the target with updated rows corresponding to those of matching primary Keys, the program then "switches" the old table in the target with the temporarily constructed one.
Merge
The Merge method updates existing rows in the target table that already exist and inserts new rows that do not already exist using the native Databricks Merge-Into command. The table history in Databricks is kept up to date using this loading technique.
For a merge operation, the source Keys that are loaded into the target table must be unique or true primary Keys. If there is any duplication in the source or target data, the command will fail and highlight the duplicate row in the error message.
To describe it, let's use the following query:
MERGE INTO `{database}`.`{target_table}` as trgt_
USING `{database}`.`{source_table}` as src_
ON {merge_condition}
WHEN MATCHED THEN UPDATE SET {update_set}
WHEN NOT MATCHED THEN INSERT ({insert_columns}) VALUES ({insert_values})
When a record matches another record based on a Key, the statement 'WHEN MATCHED THEN, UPDATE SET' is executed, updating the entries in the target table.
If any new records are found, the statement "WHEN NOT MATCHED THEN INSERT" is carried out, and the new records are added.
How does Rivery handle data loads and table metadata?
Rivery manages the tables metadata updates alongside with handling the data loading.
Here is the complete scheme of the loading process in Rivery database:
Databricks Mapping Columns
Creating the target database if not exists
Rivery checks and makes sure the database on the process exists in Databricks workspace. Therefore, if the database doesn't exist, Rivery creates the database for you.
Your personal access token should include a sufficient permissions to create any new database in the system. If it isn't, it may fail the source to target process. Please ask your databricks account manager or admin to provid you a token that have a sufficient CREATE DATABASE
permissions in Databricks.
Rivery creates the database using the next clause:
CREATE DATABASE IF NOT EXISTS `<database>`
COMMENT 'Created By Rivery On 'YYYY-MM-DDTHH:mm:SS';
Default Database Name
If no database provided on the process, the default database will be default.
List Databases Created By Rivery
In order to make sure the database is created by Rivery, run the next query:
DESCRIBE DATABASE EXTENDED <database name>;
in the results, you should have the next description:
|----------------------------|---------------------------------|
|database_description_item | database_description_value |
|----------------------------|---------------------------------|
|Database Name | <database_name> |
|Description | Created by Rivery at YYYY-MM-DD |
|----------------------------|---------------------------------|
Staging Table Loading
- Creating a table with tmp_ prefix with the correct metadata.
- Loading the data to the tmp_ table using COPY command. If the source data format is csv - creates the tmp_ using the fields in the mapping (typed-converted). If json - create a one-columed table with
string
data type. - Flattening the data (if json type in the source) using Json Extract Functions and casting with
cast([column] as [dataType]) as [alias]
syntax. - Auto-scaling Integer columns to the right Integer data type based on their data length (SMALLINT->INTEGER->BIGINT->NUMERIC).
- Dropping the old tmp_ table with the json_data field (if exists).
Load from STG Table to Target Table
In this stage - all of the following steps are performed in one transaction (Based on Postgres ability to run DDL and DML inside one transaction):
- Defining and updating any metadata changes on the target table, excluding dropping columns.
- If the target table does not exist - create it using CREATE AS SELECT clause.
- If append loading mode - clone the target table aside, and Add/Drop/Alter columns in the clone table.
- Load the data to the target table using the loading mode (as described above).
- Use DEEP CLONE to clone the tmp_ table into target table.
- Drop the tmp_ tables in the process.