Closed
Description
This code fails with a pickle error:
def multiprocessing_callback(message):
print('Received message: {}'.format(message))
message.ack()
def receive_messages_with_multiprocessing(
project, topic_name, subscription_name):
"""Receives messages from a pull subscription using multiprocessing."""
import concurrent.futures
import functools
import multiprocessing
import google.cloud.pubsub_v1.subscriber.policy.thread
# Create a process pool and a queue for sending messages. This will
# be used by the subscriber policy to execute callbacks.
executor = concurrent.futures.ProcessPoolExecutor()
manager = multiprocessing.Manager()
queue = manager.Queue()
policy_factory = functools.partial(
google.cloud.pubsub_v1.subscriber.policy.thread.Policy,
executor=executor,
queue=queue)
subscriber = pubsub_v1.SubscriberClient(policy_class=policy_factory)
subscription_path = subscriber.subscription_path(
project, subscription_name)
subscriber.subscribe(subscription_path, callback=multiprocessing_callback)
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)