Skip to content

Pub / Sub doesn't work with multiprocessing #3890

Closed
@theacodes

Description

@theacodes

This code fails with a pickle error:

def multiprocessing_callback(message):
    print('Received message: {}'.format(message))
    message.ack()


def receive_messages_with_multiprocessing(
        project, topic_name, subscription_name):
    """Receives messages from a pull subscription using multiprocessing."""
    import concurrent.futures
    import functools
    import multiprocessing

    import google.cloud.pubsub_v1.subscriber.policy.thread

    # Create a process pool and a queue for sending messages. This will
    # be used by the subscriber policy to execute callbacks.
    executor = concurrent.futures.ProcessPoolExecutor()
    manager = multiprocessing.Manager()
    queue = manager.Queue()

    policy_factory = functools.partial(
        google.cloud.pubsub_v1.subscriber.policy.thread.Policy,
        executor=executor,
        queue=queue)
    subscriber = pubsub_v1.SubscriberClient(policy_class=policy_factory)
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    subscriber.subscribe(subscription_path, callback=multiprocessing_callback)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the Pub/Sub API.priority: p1Important issue which blocks shipping the next release. Will be fixed prior to next release.release blockingRequired feature/issue must be fixed prior to next release.triaged for GAtype: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions