Monday, October 5, 2020

Adding data to Azure Synapse table from Azure Databricks

Recently I put together a prototype of using Python code in Azure Databricks to clean-up data and then efficiently insert it into Azure Synapse Analytics (previously known as Azure Data Warehouse) tables. This post documents the relevant context information and then the technical detail on how to do it. There is plenty of published documentation on how to do this integration, but I wanted to also introduce the two main mechanisms to do it (PolyBase and the COPY statement), and specifically do append of rows, instead of rewriting Synapse tables, which is what you find in most links.

Relevant links

  • Use virtual network service endpoints and rules for servers in Azure SQL Database (Microsoft docs)
    • This page covers the case of when you have your data lake (Azure Storage) with accesses restricted to a VNet, and the steps in the section "Azure Synapse PolyBase and COPY statement" actually also give you some  relevant context: "PolyBase and the COPY statement is commonly used to load data into Azure Synapse Analytics from Azure Storage accounts for high throughput data ingestion. If the Azure Storage account that you are loading data from limits access only to a set of VNet-subnets, connectivity when using PolyBase and the COPY statement to the storage account will break". Down in the example, it goes into creating External tables, and that's not something you actually need to create when you do it from Databricks with the COPY statement.
  • Azure Synapse Analytics (Databricks documentation)
    • This is perhaps the most complete page in terms of explaining how this works, but also more complex. Again it refers PolyBase and the COPY statement, and includes code, but the code provided creates a new table, instead of adding to existing tables. If you use the provided code to write to an existing table, it will actually overwrite the table and its schema.
  • Loading from Azure Data Lake Store Gen 2 into Azure Synapse Analytics (Azure SQL DW) via Azure Databricks (medium post)
    • A good post, simpler to understand than the Databricks one, and including info on how use OAuth 2.0 with Azure Storage, instead of using the Storage Key. Again the code overwrites data/rewrites existing Synapse tables.
  • Tutorial: Extract, transform, and load data by using Azure Databricks (Microsoft docs)
    • Finally, this is a step-by-step tutorial of how to do the end-to-end process. It uses Scala instead of Python, and again overwrites the destination tables. I followed the steps from here, specifically the section "Load data into Azure Synapse", and then did some modifications.

PolyBase and the COPY statement

The two fastest ways to insert data into Synapse tables are PolyBase and new COPY statement. PolyBase is older technology, and COPY is its intended and simpler replacement. It requires fewer permissions and it's simpler to use - with PolyBase you may need to use Synapse External Tables, for example, which you don't need to do with COPY.

The Databricks documentation page above includes an overview of the differences:

«In addition to PolyBase, the Azure Synapse connector supports the COPY statement. The COPY statement offers a more convenient way of loading data into Azure Synapse without the need to create an external table, requires fewer permissions to load data, and provides an improved performance for high-throughput data ingestion into Azure Synapse.» 

And right after it, it includes the code used to control whether to use PolyBase or COPY:

spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")

Here you can use "polybase" or "copy" as parameters to control how to do the data writes into Synapse. If you leave the line out, as I did in my code below, the default when using Azure Storage Gen2 + Databricks Runtime > 7.0 is "copy".

Finally, I specifically wanted to do the full ETL pipeline from Databricks, but the data transfer could actually also be orchestrated from Azure Data Factory. This documentation page goes into detail on how to do that, using either of the above alternatives.

Code details

The example code I found typically overwrote any existing table, instead of adding records to it, plus most of the examples use PolyBase + External Data Tables. In my scenario, I have the tables already created in Synapse, the Spark data frames filled with processed data, and just need to insert the rows into the destinations tables. So what are the steps?

First thing, I used the code in the tutorial link above to set up auxiliary variables, just converted from Scala to Python. This could be much more compact, but I left the original format for clarity:

Setup code

And then simply insert the data in my data frame into the target table in Azure Synapse:

In the call above, the first parameter of every option() is a keyword, so you really just need to change the respective database connection string (the "url" value) and the name of the table (the "dbTable" value). Also note that the connection string is not actually used to send the data to Synapse, but to tell it to go fetch the data from Azure Storage gen2.

And that's it. When you run this you get a message from Spark saying "Waiting for Azure Synapse Analytics to load intermediate data from wasbs://..... into "my-table-in-db" using COPY", and the data shows up in the table. The code above does not require any External Tables, and by using append you add to the table, while overwrite creates a new table from scratch (including overwriting the schema with that of the Dataframe). 

One caveat to the above -- it's your responsibility to make sure the types you use in your Spark data frame match those that you used in your Azure Synapse table definition. As an example, I had some columns as IntegerType() in my Spark Data frame, which you can't insert into TINYINT/SMALLINT columns, so the append failed.

One final note - while some places of the documentation refer to COPY as being in preview, this actually came out of Preview on September 23rd.