2.consume-events-kafka(Scala)
Loading...

Consume Events from Event Hub Kafka endpoint with Azure Schema Registry

We leverage the Event Hub Kafka Surface endpoint to pull data from Event Hub, and parsing with from_avro from the Schema Registry.

Import required modules

import com.microsoft.azure.schemaregistry.spark.avro.functions._;
import java.util._

Schema Registry Property Object

We place the Schema Registry endpoint here, along with our Service Principal info.

NOTE: We're appending http to the schema.registry.url as the Java SDK currently throws an exception if the URL is malformed:

URL Exception

val props: HashMap[String, String] = new HashMap()
  props.put("schema.registry.url", "http://<your-event-hub>.servicebus.windows.net")
  props.put("schema.registry.tenant.id", "<your-azure-ad-tenant-id>")
  props.put("schema.registry.client.id", "<your-client-id>")
  props.put("schema.registry.client.secret", "<your-client-secret>")

Kafka Config

val TOPIC = "<your-topic>"
val BOOTSTRAP_SERVERS = "<your-event-hub>.servicebus.windows.net:9093"
val EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://<your-event-hub>.servicebus.windows.net/;SharedAccessKeyName=<name>;SharedAccessKey=<your-access-key>\";"

Define Spark Dataframe

val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "60000")
    .option("failOnDataLoss", "false")
    .option("startingOffsets", "earliest")
    .load()

Deserialize with from_avro while pulling the schema from the Schema Registry

val parsed_df = df.select(from_avro($"value", "<your-schema-guid>", props))
display(parsed_df)