stream_atlas_entities_cleaned(Scala)

Loading...

Azure Purview: Monitoring Atlas Hook atlas_entities via Spark Streaming

Import widgets for Offset range Keeping things identical to Kafdrop

// Set default
dbutils.widgets.text("Offset", "0")
 
// Store in variable
val offset = dbutils.widgets.get("Offset").toInt
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions.from_json
 
// Localize environment values with Purview Atlas Hook
val TOPIC = "atlas_entities"
val BOOTSTRAP_SERVERS = "atlas-3f2....servicebus.windows.net:9093"
val EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://atlas-3f2....servicebus.windows.net/;SharedAccessKeyName=AlternateSharedAccessKey;SharedAccessKey=QBsp+45j8drJ...f6+lE=\";"
 
// Read Raw Dataframe from Kafka
val entitiesDF = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "60000")
    .option("failOnDataLoss", "false")
    .option("startingOffsets", "earliest")
    .load()
    .selectExpr("offset", "timestamp", "CAST(value AS STRING)")
    .filter($"offset".geq(offset)) // filter based on widget
Show result

Schema inference from JSON payload

classifications seems to be as nested as the payload gets, so we use that to generate the schema.

val jsData = Seq(
  ("""{
   "version": {
      "version": "1.0.0",
      "versionParts": [
         1
      ]
   },
   "msgCompressionKind": "NONE",
   "msgSplitIdx": 1,
   "msgSplitCount": 1,
   "msgSourceIP": "10.244.81.3",
   "msgCreatedBy": "",
   "msgCreationTime": 1633280059960,
   "message": {
      "type": "ENTITY_NOTIFICATION_V2",
      "entity": {
         "typeName": "azure_sql_column",
         "attributes": {
            "qualifiedName": "mssql://aemigration.database.windows.net/contosoHR_Plaintext/dbo/Employees#SSN",
            "name": "SSN"
         },
         "guid": "e338f6c9-8c2c-4cf8-ac87-cff6f6f60004",
         "status": "ACTIVE",
         "displayText": "SSN",
         "classificationNames": [
            "MICROSOFT.GOVERNMENT.US.SOCIAL_SECURITY_NUMBER",
            "Microsoft.Label.08A42F2B_FE8A_4FA5_9BF9_90B8CDBA7373"
         ],
         "classifications": [
            {
               "typeName": "MICROSOFT.GOVERNMENT.US.SOCIAL_SECURITY_NUMBER",
               "lastModifiedTS": "1",
               "entityGuid": "e338f6c9-8c2c-4cf8-ac87-cff6f6f60004",
               "entityStatus": "ACTIVE",
               "source": "DataScan",
               "sourceDetails": {
                  "ClassificationRuleId": "54115c6e-5a50-468a-88cb-fc537eb48e69",
                  "ClassificationRuleType": "System",
                  "ClassificationRuleVersion": "1",
                  "MCE_Confidence": "94",
                  "MCE_RuleId": "a44669fe-0d48-453d-a9b1-2cc83f2cba77",
                  "MCE_Name": "MCE.U.S._Social_Security_Number_(SSN)",
                  "MCE_Count": "28",
                  "MCE_UniqueCount": "28"
               }
            },
            {
               "typeName": "Microsoft.Label.08A42F2B_FE8A_4FA5_9BF9_90B8CDBA7373",
               "lastModifiedTS": "1",
               "entityGuid": "e338f6c9-8c2c-4cf8-ac87-cff6f6f60004",
               "entityStatus": "ACTIVE",
               "source": "LabelService"
            }
         ]
      },
      "operationType": "ENTITY_UPDATE",
      "eventTime": 1633280059739
   }
}""")
)
 
val schema: StructType = spark.read.json(jsData.toDS).schema

parsedDF contains the fully parsed payload:

val parsedDF = entitiesDF
               .select($"*", from_json(col("value"), schema).as("data"))
               .select("offset", "timestamp", "data.*")
 
display(parsedDF)
ENTITY_CREATE
val ENTITY_CREATE_DF = parsedDF
                          .filter(parsedDF("message.operationType") === "ENTITY_CREATE")
                          .select("offset", "timestamp", "message.*")
 
display(ENTITY_CREATE_DF)
ENTITY_UPDATE
val ENTITY_UPDATE_DF = parsedDF
                          .filter(parsedDF("message.operationType") === "ENTITY_UPDATE")
                          .select("offset", "timestamp", "message.*")
 
display(ENTITY_UPDATE_DF)
ENTITY_DELETE
val ENTITY_DELETE_DF = parsedDF
                          .filter(parsedDF("message.operationType") === "ENTITY_DELETE")
                          .select("offset", "timestamp", "message.*")
 
display(ENTITY_DELETE_DF)
CLASSIFICATION_ADD
val CLASSIFICATION_ADD_DF = parsedDF
                          .filter(parsedDF("message.operationType") === "CLASSIFICATION_ADD")
                          .select("offset", "timestamp", "message.*")
 
display(CLASSIFICATION_ADD_DF)
CLASSIFICATION_UPDATE
val CLASSIFICATION_UPDATE_DF = parsedDF
                          .filter(parsedDF("message.operationType") === "CLASSIFICATION_UPDATE")
                          .select("offset", "timestamp", "message.*")
 
display(CLASSIFICATION_UPDATE_DF)
CLASSIFICATION_DELETE
val CLASSIFICATION_DELETE_DF = parsedDF
                          .filter(parsedDF("message.operationType") === "CLASSIFICATION_DELETE")
                          .select("offset", "timestamp", "message.*")
 
display(CLASSIFICATION_DELETE_DF)

Of course, we'd be filtering on specific datasources we want to monitor by parameterizing this pipeline as required (in this case our Azure SQL DB).