1.autoloader-from-currents-landing(Python)
Loading...

Read landing stream from Blob via Auto Loader, parse and write to Delta

We will leverage structured streaming to ingest .avro data from Blob via Auto Loader - Source and persist to a DBFS mount (Azure Data Lake Gen2) in Delta format - Sink.

NOTE: This notebook is parameterized, based on the table_name passed in, the pipeline can be reused for multiple tables.

Import widgets for table name

# Set default
dbutils.widgets.text("table", "users.messages.pushnotification.Send")
 
# Store in variable
table = dbutils.widgets.get("table")
print("Table: {}".format(table))

Import required modules

from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.types import *

Set configuration objects

# ADLS
adls_account = dbutils.secrets.get(scope="key-vault-secrets", key="adls-storageAccount-Name")
adls_key = dbutils.secrets.get(scope="key-vault-secrets", key="adls-storageAccount-AccessKey")
 
spark.conf.set("fs.azure.account.key." + adls_account + ".blob.core.windows.net", adls_key)
spark.conf.set("fs.azure.account.key." + adls_account + ".dfs.core.windows.net", adls_key)
 
# Blob
blob_account = dbutils.secrets.get(scope="key-vault-secrets", key="blob-storageAccount-Name")
blob_key = dbutils.secrets.get(scope="key-vault-secrets", key="blob-storageAccount-AccessKey")
 
spark.conf.set("fs.azure.account.key." + blob_account + ".blob.core.windows.net", blob_key) 

Gather schema from .avro

Read a single source .avro file into a Spark Dataframe to retrieve current schema. Then use the schema to configure the Autoloader readStream code segment.

df_tmp = spark.read.format("avro").load("/mnt/bronze/tmp/{}.avro".format(table))
 
dataset_schema = df_tmp.schema

Read Stream from Auto Loader (i.e. Source)

Queue SAS Key

queue_sas = dbutils.secrets.get(scope="key-vault-secrets", key="adls-storageQueue-SAS")

cloudFile settings

cloudFilesConf = {
  "cloudFiles.subscriptionId": dbutils.secrets.get(scope="key-vault-secrets", key="sp-subscriptionId"),
  "cloudFiles.connectionString": queue_sas,
  "cloudFiles.format": "avro",
  "cloudFiles.tenantId": dbutils.secrets.get(scope="key-vault-secrets", key="sp-tenantId"),
  "cloudFiles.clientId": dbutils.secrets.get(scope="key-vault-secrets", key="sp-clientId"),
  "cloudFiles.clientSecret": dbutils.secrets.get(scope="key-vault-secrets", key="sp-clientKey"),
  "cloudFiles.resourceGroup": dbutils.secrets.get(scope="key-vault-secrets", key="sp-rgName"),
  "cloudFiles.useNotifications": "true",
  "cloudFiles.includeExistingFiles": "true",
  "cloudFiles.validateOptions": "true",  
}
autoloader_df = (spark.readStream.format("cloudFiles")
                 .options(**cloudFilesConf)
                 .option("recursiveFileLookup", "true") # This lets us ignore folder level partitioning into the incoming Dataframe
                 .schema(dataset_schema)
                 .load("wasbs://landing@{}.blob.core.windows.net/currents/dataexport.prod-02.AzureBlob.integration.12345/event_type={}/".format(adls_account, table))
                )