Closed
Description
- EH error story: add a bunch of tests to for all test scenarios. and check that uamqp and pyamqp raise the same errors. check that we have tests for optional params, like timeout flags etc.
Test cases:
- 1) Sending EventHubProducerClient(...connection_verify="cacert.pem").
- Behavior:
uamqp
- pytest.raisesEventHubError
. pyamqp - pytest.raisesConnectError
. --> Issue here : [EventHubs] fix errors raised for invalid connection verify path #27128 - Expected: Both result in
EventHubError
. - Probably b/c uamqp takes connection_verify in to JWTTokenAuth. In pyamqp, verify is taken by the Send/ReceiveClient and the values being sent in to JWTTokenAuth are disregarded. Fix this.
- Behavior:
- 2) Buffered Producer with send_batch(), (potentially on both sync/async unsure) doesn't send messages on AttributeError("'list' object has no attribute '_internal_events'")}
- 3) Setting retry_total>0 and 0 and have consumer client error in 2 ways:
- retry_total>0; pass in a SASTokenCredential that expires quickly. See how client + eventprocessor behaves. should call error callback --> Issue created for this ([EventHubs] retry_total>0 sync consumer hangs when exception #27137)
- retry_total=0, Force link detach by updating properties of EH while receiving. If retry set to 0, should call error callback right away and continue running.
- should these close and re-open consumer and partition?
- should only non-retryable errors close the partition?
import os
import sys
from azure.eventhub import EventHubConsumerClient
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
import logging
#uamqp_logger = logging.getLogger('uamqp')
#uamqp_logger.setLevel(logging.DEBUG)
# Configure a console output
#handler = logging.StreamHandler(stream=sys.stdout)
#uamqp_logger.addHandler(handler)
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
print("Received event from partition: {}.".format(partition_context.partition_id))
def on_partition_initialize(partition_context):
# Put your code here.
print("Partition: {} has been initialized.".format(partition_context.partition_id))
def on_partition_close(partition_context, reason):
# Put your code here.
print("Partition: {} has been closed, reason for closing: {}.".format(
partition_context.partition_id,
reason
))
consumer_client.close()
def on_error(partition_context, error):
# Put your code here. partition_context can be None in the on_error callback.
if partition_context:
print("An exception: {} occurred during receiving from Partition: {}.".format(
partition_context.partition_id,
error
))
else:
print("An exception: {} occurred during the load balance process.".format(error))
def update_entity(interval): # forces service link detach
subscription_id = os.environ['SUBSCRIPTION_ID']
live_eventhub = {
"resource_group" : 'swathip-test',
"namespace": 'swathip-test-eventhubs',
"event_hub": "eventhub-test"
}
from azure.mgmt.eventhub import EventHubManagementClient
from azure.identity import EnvironmentCredential
import threading
mgmt_client = EventHubManagementClient(EnvironmentCredential(), subscription_id)
def _schedule_update_properties():
eventhub = mgmt_client.event_hubs.get(
live_eventhub["resource_group"],
live_eventhub["namespace"],
live_eventhub["event_hub"]
)
properties = eventhub.as_dict()
if properties["message_retention_in_days"] == 1:
properties["message_retention_in_days"] = 2
else:
properties["message_retention_in_days"] = 1
mgmt_client.event_hubs.create_or_update(
live_eventhub["resource_group"],
live_eventhub["namespace"],
live_eventhub["event_hub"],
properties
)
print('updating')
t = threading.Timer(interval, _schedule_update_properties)
t.start()
if __name__ == '__main__':
update_entity(4)
update_entity(6)
update_entity(10)
consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STR,
consumer_group='$Default',
eventhub_name=EVENTHUB_NAME,
uamqp_transport=True,
retry_total=0,
logging_enable=True
)
#import time
#from azure.eventhub import EventHubSharedKeyCredential
#from azure.eventhub._client_base import EventHubSASTokenCredential
#from azure.identity import EnvironmentCredential
#credential = EventHubSharedKeyCredential(os.environ["EVENT_HUB_SAS_POLICY"], os.environ['EVENT_HUB_SAS_KEY'])
#auth_uri = "sb://{}/{}".format(os.environ['EVENT_HUB_HOSTNAME'], os.environ['EVENT_HUB_NAME'])
#token = credential.get_token(auth_uri).token
#consumer_client = EventHubConsumerClient(
# fully_qualified_namespace=os.environ['EVENT_HUB_HOSTNAME'],
# eventhub_name=os.environ['EVENT_HUB_NAME'],
# consumer_group='$Default',
# credential=EventHubSASTokenCredential(token, time.time() + 8),
# uamqp_transport=True,
# retry_total=1,
# logging_enable=True
#)
try:
with consumer_client:
consumer_client.receive(
on_event=on_event,
on_partition_initialize=on_partition_initialize,
on_partition_close=on_partition_close,
on_error=on_error,
starting_position="-1", # "-1" is from the beginning of the partition.
)
except KeyboardInterrupt:
print('Stopped receiving.')
- Note: Passing in an expired token credential using a connection string does not result in any errors raised for either uamqp or pyamqp. Issue [EventHubs] credential arg should override default credential from connection string #27079 created for this.