# ADLS
adls_account = dbutils.secrets.get(scope="key-vault-secrets", key="adls-storageAccount-Name")
adls_key = dbutils.secrets.get(scope="key-vault-secrets", key="adls-storageAccount-AccessKey")
spark.conf.set("fs.azure.account.key." + adls_account + ".blob.core.windows.net", adls_key)
spark.conf.set("fs.azure.account.key." + adls_account + ".dfs.core.windows.net", adls_key)
# Blob
blob_account = dbutils.secrets.get(scope="key-vault-secrets", key="blob-storageAccount-Name")
blob_key = dbutils.secrets.get(scope="key-vault-secrets", key="blob-storageAccount-AccessKey")
spark.conf.set("fs.azure.account.key." + blob_account + ".blob.core.windows.net", blob_key)
cloudFilesConf = {
"cloudFiles.subscriptionId": dbutils.secrets.get(scope="key-vault-secrets", key="sp-subscriptionId"),
"cloudFiles.connectionString": queue_sas,
"cloudFiles.format": "avro",
"cloudFiles.tenantId": dbutils.secrets.get(scope="key-vault-secrets", key="sp-tenantId"),
"cloudFiles.clientId": dbutils.secrets.get(scope="key-vault-secrets", key="sp-clientId"),
"cloudFiles.clientSecret": dbutils.secrets.get(scope="key-vault-secrets", key="sp-clientKey"),
"cloudFiles.resourceGroup": dbutils.secrets.get(scope="key-vault-secrets", key="sp-rgName"),
"cloudFiles.useNotifications": "true",
"cloudFiles.includeExistingFiles": "true",
"cloudFiles.validateOptions": "true",
}
autoloader_df = (spark.readStream.format("cloudFiles")
.options(**cloudFilesConf)
.option("recursiveFileLookup", "true") # This lets us ignore folder level partitioning into the incoming Dataframe
.schema(dataset_schema)
.load("wasbs://landing@{}.blob.core.windows.net/currents/dataexport.prod-02.AzureBlob.integration.12345/event_type={}/".format(adls_account, table))
)
Read landing stream from Blob via Auto Loader, parse and write to Delta
We will leverage structured streaming to ingest
.avro
data from Blob via Auto Loader -Source
and persist to a DBFS mount (Azure Data Lake Gen2) in Delta format -Sink
.NOTE: This notebook is parameterized, based on the
table_name
passed in, the pipeline can be reused for multiple tables.Last refresh: Never