1.produce-events(Python)
Loading...

Produce events to Event Hub with Azure Schema Registry

This code is a slight modification of what's available here - essentially, it has been modified to randomize the data being sent.

Import required modules

import os
 
from azure.identity import ClientSecretCredential
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import SchemaRegistryAvroSerializer
 
from azure.eventhub import EventHubProducerClient, EventData

Service Principal Secret

This Service Principal should be assigned Schema Registry Contributor (Preview) role on the Event Hub containing the Schema Registry:

Service Principal

token_credential = ClientSecretCredential(
    tenant_id="<your-azure-ad-tenant-id>",
    client_id="<your-client-id>",
    client_secret="<your-client-secret>"
)

Schema Registry and Schema Definition

Note that this is the Event Hub Namespace we want to leverage for our Schema Registry:

SCHEMA_REGISTRY_ENDPOINT = "<your-event-hub>.servicebus.windows.net"
SCHEMA_GROUP="user_schemagroup"
SCHEMA_STRING = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "user_name", "type": "string"},
     {"name": "user_score",  "type": ["int", "null"]},
     {"name": "user_id", "type": ["string", "null"]}
 ]
}"""

Event Hub for Data

Note that this is the Event Hub Instance we want to leverage for sending our Event Data:

EVENTHUB_CONNECTION_STR = "Endpoint=sb://<your-event-hub>.servicebus.windows.net/;SharedAccessKeyName=<name>;SharedAccessKey=<your-access-key>;EntityPath=<your-topic>"
EVENTHUB_NAME = "<your-topic>"

Function for generating event data

import random
import string
import uuid
 
def send_event_data_batch(producer, serializer):
    event_data_batch = producer.create_batch()
    
    username = ''.join(random.choice(string.ascii_lowercase) for i in range(10))
    number = random.randint(1, 9)
    guid = str(uuid.uuid4())
    
    dict_data = {"user_name": username, "user_score": number, "user_id": guid}
    # Use the serialize method to convert dict object to bytes with the given avro schema.
    # The serialize method would automatically register the schema into the Schema Registry Service and
    # schema would be cached locally for future usage.
    payload_bytes = serializer.serialize(data=dict_data, schema=SCHEMA_STRING)
    print('The bytes of serialized dict data is {}.'.format(payload_bytes))
 
    event_data = EventData(body=payload_bytes)  # pass the bytes data to the body of an EventData
    event_data_batch.add(event_data)
    producer.send_batch(event_data_batch)
    print('Send is done.')

Event Hub Client

# create an EventHubProducerClient instance
eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=EVENTHUB_CONNECTION_STR,
    eventhub_name=EVENTHUB_NAME
)

Schema Registry Client

# create a SchemaRegistryAvroSerializer instance
avro_serializer = SchemaRegistryAvroSerializer(
    schema_registry=SchemaRegistryClient(
        endpoint=SCHEMA_REGISTRY_ENDPOINT,
        credential=token_credential
    ),
    schema_group=SCHEMA_GROUP
)