1.consume-events-eh(Scala)
Loading...

Consume Events from Event Hub AMQP endpoint with Azure Schema Registry

We leverage the azure-eventhubs-spark endpoint to pull data from Event Hub, and parsing with from_avro from the Schema Registry.

Import required modules

import com.microsoft.azure.schemaregistry.spark.avro.functions._;
import java.util._

Schema Registry Property Object

We place the Schema Registry endpoint here, along with our Service Principal info.

NOTE: We're appending http to the schema.registry.url as the Java SDK currently throws an exception if the URL is malformed:

URL Exception

val props: HashMap[String, String] = new HashMap()
  props.put("schema.registry.url", "http://<your-event-hub>.servicebus.windows.net")
  props.put("schema.registry.tenant.id", "<your-azure-ad-tenant-id>")
  props.put("schema.registry.client.id", "<your-client-id>")
  props.put("schema.registry.client.secret", "<your-client-secret>")

Event Hub Config

import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.functions.{ explode, split }
 
val connectionString = ConnectionStringBuilder("Endpoint=sb://<your-event-hub>.servicebus.windows.net/;SharedAccessKeyName=<name>;SharedAccessKey=<your-access-key>;EntityPath=<your-topic>")
  .setEventHubName("<your-topic>")
  .build
 
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromStartOfStream)

Define Spark Dataframe

val df = spark.readStream
  .format("eventhubs")
  .options(eventHubsConf.toMap)
  .load()

Deserialize with from_avro while pulling the schema from the Schema Registry

val parsed_df = df.select(from_avro($"body", "<your-schema-guid>", props))
display(parsed_df)