val props: HashMap[String, String] = new HashMap()
props.put("schema.registry.url", "http://<your-event-hub>.servicebus.windows.net")
props.put("schema.registry.tenant.id", "<your-azure-ad-tenant-id>")
props.put("schema.registry.client.id", "<your-client-id>")
props.put("schema.registry.client.secret", "<your-client-secret>")
val TOPIC = "<your-topic>"
val BOOTSTRAP_SERVERS = "<your-event-hub>.servicebus.windows.net:9093"
val EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://<your-event-hub>.servicebus.windows.net/;SharedAccessKeyName=<name>;SharedAccessKey=<your-access-key>\";"
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "60000")
.option("failOnDataLoss", "false")
.option("startingOffsets", "earliest")
.load()
Consume Events from Event Hub Kafka endpoint with Azure Schema Registry
We leverage the Event Hub Kafka Surface endpoint to pull data from Event Hub, and parsing with
from_avro
from the Schema Registry.Last refresh: Never