Skip to content

[EventHubs] SDK layer exception parity for uamqp/pyamqp #26229

Closed
@swathipil

Description

@swathipil
  • EH error story: add a bunch of tests to for all test scenarios. and check that uamqp and pyamqp raise the same errors. check that we have tests for optional params, like timeout flags etc.

Test cases:

  • 1) Sending EventHubProducerClient(...connection_verify="cacert.pem").
  • 2) Buffered Producer with send_batch(), (potentially on both sync/async unsure) doesn't send messages on AttributeError("'list' object has no attribute '_internal_events'")}
  • 3) Setting retry_total>0 and 0 and have consumer client error in 2 ways:
    • retry_total>0; pass in a SASTokenCredential that expires quickly. See how client + eventprocessor behaves. should call error callback --> Issue created for this ([EventHubs] retry_total>0 sync consumer hangs when exception #27137)
    • retry_total=0, Force link detach by updating properties of EH while receiving. If retry set to 0, should call error callback right away and continue running.
    • should these close and re-open consumer and partition?
    • should only non-retryable errors close the partition?
import os
import sys
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

import logging
#uamqp_logger = logging.getLogger('uamqp')
#uamqp_logger.setLevel(logging.DEBUG)

# Configure a console output
#handler = logging.StreamHandler(stream=sys.stdout)
#uamqp_logger.addHandler(handler)

def on_event(partition_context, event):
    # Put your code here.
    # If the operation is i/o intensive, multi-thread will have better performance.
    print("Received event from partition: {}.".format(partition_context.partition_id))


def on_partition_initialize(partition_context):
    # Put your code here.
    print("Partition: {} has been initialized.".format(partition_context.partition_id))


def on_partition_close(partition_context, reason):
    # Put your code here.
    print("Partition: {} has been closed, reason for closing: {}.".format(
        partition_context.partition_id,
        reason
    ))
    consumer_client.close()


def on_error(partition_context, error):
    # Put your code here. partition_context can be None in the on_error callback.
    if partition_context:
        print("An exception: {} occurred during receiving from Partition: {}.".format(
            partition_context.partition_id,
            error
        ))
    else:
        print("An exception: {} occurred during the load balance process.".format(error))

def update_entity(interval):    # forces service link detach
    subscription_id = os.environ['SUBSCRIPTION_ID']
    live_eventhub = {
        "resource_group" : 'swathip-test',
        "namespace": 'swathip-test-eventhubs',
        "event_hub": "eventhub-test"
    }
    from azure.mgmt.eventhub import EventHubManagementClient
    from azure.identity import EnvironmentCredential
    import threading
    mgmt_client = EventHubManagementClient(EnvironmentCredential(), subscription_id)
    def _schedule_update_properties():
        eventhub = mgmt_client.event_hubs.get(
            live_eventhub["resource_group"],
            live_eventhub["namespace"],
            live_eventhub["event_hub"]
        )
        properties = eventhub.as_dict()
        if properties["message_retention_in_days"] == 1:
            properties["message_retention_in_days"] = 2
        else:
            properties["message_retention_in_days"] = 1
        mgmt_client.event_hubs.create_or_update(
            live_eventhub["resource_group"],
            live_eventhub["namespace"],
            live_eventhub["event_hub"],
            properties
        )
        print('updating')
    t = threading.Timer(interval, _schedule_update_properties)
    t.start()


if __name__ == '__main__':
    update_entity(4)
    update_entity(6)
    update_entity(10)
    consumer_client = EventHubConsumerClient.from_connection_string(
        conn_str=CONNECTION_STR,
        consumer_group='$Default',
        eventhub_name=EVENTHUB_NAME,
        uamqp_transport=True,
        retry_total=0,
        logging_enable=True
    )

    #import time
    #from azure.eventhub import EventHubSharedKeyCredential
    #from azure.eventhub._client_base import EventHubSASTokenCredential
    #from azure.identity import EnvironmentCredential
    #credential = EventHubSharedKeyCredential(os.environ["EVENT_HUB_SAS_POLICY"], os.environ['EVENT_HUB_SAS_KEY'])
    #auth_uri = "sb://{}/{}".format(os.environ['EVENT_HUB_HOSTNAME'], os.environ['EVENT_HUB_NAME'])
    #token = credential.get_token(auth_uri).token
    #consumer_client = EventHubConsumerClient(
    #    fully_qualified_namespace=os.environ['EVENT_HUB_HOSTNAME'],
    #    eventhub_name=os.environ['EVENT_HUB_NAME'],
    #    consumer_group='$Default',
    #    credential=EventHubSASTokenCredential(token, time.time() + 8),
    #    uamqp_transport=True,
    #    retry_total=1,
    #    logging_enable=True
    #)

    try:
        with consumer_client:
            consumer_client.receive(
                on_event=on_event,
                on_partition_initialize=on_partition_initialize,
                on_partition_close=on_partition_close,
                on_error=on_error,
                starting_position="-1",  # "-1" is from the beginning of the partition.
            )
    except KeyboardInterrupt:
        print('Stopped receiving.')

Metadata

Metadata

Assignees

No one assigned

    Labels

    ClientThis issue points to a problem in the data-plane of the library.Event HubsMessagingMessaging crew

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions