Skip to content

Consumer deadlock in coordinator when heartbeat request timeout #1985

Closed
@huangcuiyang

Description

@huangcuiyang

All my consumers in group is disconnected, and i find that consumer process meet a deadlock in coordinator. i want to fix the deadlock by upgrade sdk version from 1.4.1 to 1.4.7, but it happed again...help me @dpkp
hung in coordinator/base.py:1006 _handle_heartbeat_failure().self.coordinator._lock

  • kafka-python version is 1.4.7
  • Show the gdb bt in thread 1
(gdb) thread 1
[Switching to thread 1 (Thread 0x7fe31915a740 (LWP 26))]
#0  0x00007fe31896a79b in futex_abstimed_wait (cancel=true, private=<optimized out>, abstime=0x0, expected=0, 
    futex=0x2662670) at ../nptl/sysdeps/unix/sysv/linux/sem_waitcommon.c:43
43	      err = lll_futex_wait (futex, expected, private);
(gdb) bt
#0  0x00007fe31896a79b in futex_abstimed_wait (cancel=true, private=<optimized out>, abstime=0x0, expected=0, 
    futex=0x2662670) at ../nptl/sysdeps/unix/sysv/linux/sem_waitcommon.c:43
#1  do_futex_wait (sem=sem@entry=0x2662670, abstime=0x0) at ../nptl/sysdeps/unix/sysv/linux/sem_waitcommon.c:223
#2  0x00007fe31896a82f in __new_sem_wait_slow (sem=0x2662670, abstime=0x0)
    at ../nptl/sysdeps/unix/sysv/linux/sem_waitcommon.c:292
#3  0x00007fe31896a8cb in __new_sem_wait (sem=<optimized out>) at ../nptl/sysdeps/unix/sysv/linux/sem_wait.c:28
#4  0x00007fe318c8a535 in PyThread_acquire_lock (lock=0x2662670, waitflag=1)
    at /usr/src/debug/Python-2.7.5/Python/thread_pthread.h:323
#5  0x00007fe318c8e1c2 in lock_PyThread_acquire_lock (self=0x24aa1b0, args=<optimized out>)
    at /usr/src/debug/Python-2.7.5/Modules/threadmodule.c:52
#6  0x00007fe318c5daf0 in call_function (oparg=<optimized out>, pp_stack=0x7ffd30399e30)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4408
#7  PyEval_EvalFrameEx (
    f=f@entry=Frame 0x27b5c20, for file /usr/lib64/python2.7/threading.py, line 173, in acquire (self=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, blocking=1, me=140613355153216), throwflag=throwflag@entry=0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#8  0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=1, kws=0x129c058, kwcount=0, defs=0x7fe318fae7e8, defcount=1, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#9  0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039a040, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#10 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039a040)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#11 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x129bed0, for file /usr/lib64/python2.7/threading.py, line 285, in __enter__ (self=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#12 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=args@entry=0x25922e8, argcount=1, kws=kws@entry=0x0, kwcount=kwcount@entry=0, defs=defs@entry=0x0, 
    defcount=defcount@entry=0, closure=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#13 0x00007fe318be9798 in function_call (func=<function at remote 0x7fe318fc0b18>, 
    arg=(<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>,), kw=0x0) at /usr/src/debug/Python-2.7.5/Objects/funcobject.c:526
#14 0x00007fe318bc48e3 in PyObject_Call (func=func@entry=<function at remote 0x7fe318fc0b18>, 
    arg=arg@entry=(<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLo---Type <return> to continue, or q <return> to quit---
ck__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>,), kw=kw@entry=0x0)
    at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2529
#15 0x00007fe318bd38d5 in instancemethod_call (func=<function at remote 0x7fe318fc0b18>, 
    arg=(<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>,), kw=0x0) at /usr/src/debug/Python-2.7.5/Objects/classobject.c:2602
#16 0x00007fe318bc48e3 in PyObject_Call (func=func@entry=<instancemethod at remote 0x25ca6e0>, arg=arg@entry=(), 
    kw=kw@entry=0x0) at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2529
#17 0x00007fe318bc51dc in PyObject_CallFunctionObjArgs (
    callable=callable@entry=<instancemethod at remote 0x25ca6e0>)
    at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2760
#18 0x00007fe318c5825b in PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2b9bff0, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/coordinator/base.py, line 1006, in _handle_heartbeat_failure (self=<HeartbeatThread(coordinator=<weakproxy at remote 0x24b57e0>, _Thread__ident=140613085271808, failed=None, _Thread__block=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa330>, acquire=<built-in method acquire of thread.lock object at remote 0x---Type <return> to continue, or q <return> to quit---
24aa330>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa330>) at remote 0x2592d10>, _Thread__name='collect-aflogs-parser-heartbeat', _Thread__daemonic=True, enabled=True, _Thread__started=<_Event(_Verbose__verbose=False, _Event__flag=True, _Event__cond=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa310>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa310>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa310>) at re...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:2928
#19 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=2, kws=0x7fe304004430, kwcount=0, defs=0x0, defcount=0, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#20 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039a8a0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#21 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039a8a0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#22 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x7fe304004270, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/future.py, line 79, in _call_backs (self=<Future(exception=<RequestTimedOutError at remote 0x25ca5a0>, is_done=True, _errbacks=[<instancemethod at remote 0x2495910>], value=None, _callbacks=[<instancemethod at remote 0x21ba0f0>]) at remote 0x7fe30c08a690>, back_type='errback', backs=[...], value=<...>, f=<instancemethod at remote 0x2495910>), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#23 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=4, kws=0x284f048, kwcount=0, defs=0x0, defcount=0, closure=closure@entry=0x0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#24 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039aab0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#25 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039aab0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#26 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x284eea0, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/future.py, line 45, in failure (self=<Future(exception=<RequestTimedOutError at remote 0x25ca5a0>, is_done=True, _errbacks=[<instancemethod at remote 0x2495910>], value=None, _callbacks=[<instancemethod at remote 0x21ba0f0>]) at remote 0x7fe30c08a690>, e=<...>), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#27 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=2, kws=0x138fe70, kwcount=0, defs=0x0, defcount=0, closure=closure@entry=0x0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#28 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039acc0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#29 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039acc0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#30 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x138fcc0, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/coordinator/base.py, line 499, in _failed_request (self=<ConsumerCoordinator(completed_offset_commits=<collections.deque at remote 0x24e3520>, _is_leader=False, rejoining=False, rejoin_needed=False, join_future=None, coordinator_---Type <return> to continue, or q <return> to quit---
id=None, _assignment_snapshot=None, _lock=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>, _generation=<Generation(member_id=u'soapa_collect_proxy-57ffdc60-08f7-4bcd-9860-4b4b82422fc9', protocol=u'range', generation_id=663) at remote 0x7fe3...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#31 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=args@entry=0x25311e8, argcount=5, kws=kws@entry=0x7fe31911a068, kwcount=kwcount@entry=0, 
    defs=defs@entry=0x0, defcount=defcount@entry=0, closure=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#32 0x00007fe318be988d in function_call (func=<function at remote 0x19897d0>, 
    arg=(<ConsumerCoordinator(completed_offset_commits=<collections.deque at remote 0x24e3520>, _is_leader=False, rejoining=False, rejoin_needed=False, join_future=None, coordinator_id=None, _assignment_snapshot=None, _lock=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>, _generation=<Generation(member_id=u'soapa_collect_proxy-57ffdc60-08f7-4bcd-9860-4b4b82422fc9', protocol=u'range', generation_id=663) at remote 0x7fe30c056850>, _cluster=<ClusterMetadata(_groups={'collect-aflogs-parser': 'coordinator-1'}, _coordinator_brokers={'coordinator-1': <BrokerMetadata at remote 0x2531...(truncated), kw={}) at /usr/src/debug/Python-2.7.5/Objects/funcobject.c:526
#33 0x00007fe318bc48e3 in PyObject_Call (func=func@entry=<function at remote 0x19897d0>, 
    arg=arg@entry=(<ConsumerCoordinator(completed_offset_commits=<collections.deque at remote 0x24e3520>, _is_leader=False, rejoining=False, rejoin_needed=False, join_future=None, coordinator_id=None, _assignment_snapshot=None, _lock=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>, _generation=<Generation(member_id=u'soapa_collect_proxy-57ffdc60-08f7-4bcd-9860-4b4b82422fc9', protocol=u'range', generation_id=663) at remote 0x7fe30c056850>, _cluster=<ClusterMetadata(_groups={'collect-aflogs-parser': 'coordinator-1'}, _coordinator_brokers={'coordinator-1': <BrokerMetadata at remote 0x2531...(truncated), kw=kw@entry={}) at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2529
#34 0x00007fe318bd38d5 in instancemethod_call (func=<function at remote 0x19897d0>, 
    arg=(<ConsumerCoordinator(completed_offset_commits=<collections.deque at remote 0x24e3520>, _is_leader=False, rejoining=False, rejoin_needed=False, join_future=None, coordinator_id=None, _assignment_snapshot=None, _lock=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>, _generation=<Generation(member_id=u'soapa_collect_proxy-57ffdc60-08f7-4bcd-9860-4b4b82422fc9', protocol=u'range', generation_id=663) at remote 0x7fe30c056850>, _cluster=<ClusterMetadata(_groups={'collect-aflogs-parser': 'coordinator-1'}, _coordinator_brokers={'coordinator-1': <BrokerMetadata at remote 0x2531...(truncate---Type <return> to continue, or q <return> to quit---
d), kw={}) at /usr/src/debug/Python-2.7.5/Objects/classobject.c:2602
#35 0x00007fe318bc48e3 in PyObject_Call (func=<instancemethod at remote 0x2468aa0>, 
    arg=arg@entry=('coordinator-1', <HeartbeatRequest_v1(encode=<WeakMethod(_target_id=140613136197072, _method_id=22846832, target=<weakref at remote 0x24c1a48>, method=<weakref at remote 0x24c17e0>) at remote 0x7fe30c08ae50>, member_id=u'soapa_collect_proxy-57ffdc60-08f7-4bcd-9860-4b4b82422fc9', group='collect-aflogs-parser', generation_id=663) at remote 0x7fe30c08a5d0>, <Future(exception=<RequestTimedOutError at remote 0x25ca5a0>, is_done=True, _errbacks=[<instancemethod at remote 0x2495910>], value=None, _callbacks=[<instancemethod at remote 0x21ba0f0>]) at remote 0x7fe30c08a690>, <...>), kw=kw@entry={}) at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2529
#36 0x00007fe31673e8f1 in partial_call (pto=0x24c16d8, args=<optimized out>, kw=0x0)
    at /usr/src/debug/Python-2.7.5/Modules/_functoolsmodule.c:205
#37 0x00007fe318bc48e3 in PyObject_Call (func=func@entry=<functools.partial at remote 0x24c16d8>, 
    arg=arg@entry=(<RequestTimedOutError at remote 0x25ca5a0>,), kw=kw@entry=0x0)
    at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2529
#38 0x00007fe318c59036 in do_call (nk=<optimized out>, na=<optimized out>, pp_stack=0x7ffd3039b290, 
    func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4626
#39 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039b290)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4431
#40 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2b98ca0, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/future.py, line 79, in _call_backs (self=<Future(exception=<RequestTimedOutError at remote 0x25ca5a0>, is_done=True, _errbacks=[<functools.partial at remote 0x24c16d8>], value=None, _callbacks=[<functools.partial at remote 0x24c1---Type <return> to continue, or q <return> to quit---
520>]) at remote 0x24cb610>, back_type='errback', backs=[...], value=<...>, f=<functools.partial at remote 0x24c16d8>), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#41 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=4, kws=0x2738118, kwcount=0, defs=0x0, defcount=0, closure=closure@entry=0x0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#42 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039b4a0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#43 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039b4a0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#44 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2737f70, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/future.py, line 45, in failure (self=<Future(exception=<RequestTimedOutError at remote 0x25ca5a0>, is_done=True, _errbacks=[<functools.partial at remote 0x24c16d8>], value=None, _callbacks=[<functools.partial at remote 0x24c1520>]) at remote 0x24cb610>, e=<...>), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#45 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=2, kws=0x26914e8, kwcount=0, defs=0x0, defcount=0, closure=closure@entry=0x0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#46 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039b6b0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#47 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039b6b0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#48 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2691320, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/conn.py, line 887, in close (self=<BrokerConnection(_gai=[], _failures=1, afi=0, state='<disconnected>', _sensors=<BrokerConnectionMetrics(metrics=<Metrics(_config=<MetricConfig(event_window=9223372036854775807, _samples=2, quota=None, tags={'client-id': 'soapa_collect_proxy'}, time_window_ms=30000) at remote 0x255fa50>, _children_sensors={<Sensor(_metrics=[<KafkaMetric(_config=<...>, _measurable=<Rate(_stat=<Count(_initial_value=<float at remote 0x157c908>, _samples=[<Sample(event_count=228, last_window_ms=<float at remote 0x7fe30c2a2890>, value=<float at remote 0x19e8cf28>, initial_value=<float at remote 0x157c908>) at remote 0x24be510>, <Sample(event_count=294, last_window_ms=<float at remote 0x7fe30c21a068>, value=<float at remote 0x7fe30c21a1a0>, initial_value=<float at remote 0x157c908>) at remote 0x24cb750>], _current=0) at remote 0x24b7950>, _unit=3) at remote 0x24b7990>, _metric_name=<MetricName(_...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#49 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=1, kws=0x2568328, kwcount=1, defs=0x17b10e8, defcount=1, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#50 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039b8c0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#51 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039b8c0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#52 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2568150, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/client_async.py, line 698, in _poll (self=<KafkaClient(_conns=<Dict at remote 0x265a9e0>, _sending=set([]), _idle---Type <return> to continue, or q <return> to quit---
_expiry_manager=<IdleConnectionManager(connections_max_idle=<float at remote 0x2641b68>, lru_connections=<OrderedDict(_OrderedDict__root=[[[[...], [...], 4], [...], 2], [...], None], _OrderedDict__map={2: [...], 4: [...]}) at remote 0x265b2a0>, next_idle_close_check_time=<float at remote 0x7fe30c11fd38>) at remote 0x255fd10>, _closed=False, _pending_completion=<collections.deque at remote 0x218cc90>, _wake_r=<_socket.socket at remote 0x21aab40>, _lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613355153216, _RLock__block=<thread.lock at remote 0x1ac1730>, _RLock__count=1) at remote 0x255fcd0>, _connecting=set([]), _sensors=<KafkaClientMetrics(metric_group_name='consumer-metrics', select_time=<Sensor(_metrics=[<KafkaMetric(_config=<MetricConfig(event_window=9223372036854775807, _samples=2,...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#53 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=2, kws=0x7fe30c0c4da0, kwcount=0, defs=0x0, defcount=0, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#54 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039bad0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#55 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039bad0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#56 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x7fe30c0c4bd0, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/client_async.py, line 598, in poll (self=<KafkaClient(_conns=<Dict at remote 0x265a9e0>, _sending=set([]), _idle_expiry_manager=<IdleConnectionManager(connections_max_idle=<float at remote 0x2641b68>, lru_connections=<OrderedDict(_OrderedDict__root=[[[[...], [...], 4], [...], 2], [...], None], _OrderedDict__map={2: [...], 4: [...]}) at remote 0x265b2a0>, next_idle_close_check_time=<float at remote 0x7fe30c11fd38>) at remote 0x255fd10>, _closed=False, _---Type <return> to continue, or q <return> to quit---
pending_completion=<collections.deque at remote 0x218cc90>, _wake_r=<_socket.socket at remote 0x21aab40>, _lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613355153216, _RLock__block=<thread.lock at remote 0x1ac1730>, _RLock__count=1) at remote 0x255fcd0>, _connecting=set([]), _sensors=<KafkaClientMetrics(metric_group_name='consumer-metrics', select_time=<Sensor(_metrics=[<KafkaMetric(_config=<MetricConfig(event_window=9223372036854775807, _sample...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#57 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=1, kws=0x2683af0, kwcount=1, defs=0x14ae890, defcount=2, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#58 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039bce0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#59 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039bce0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#60 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2683930, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/consumer/group.py, line 692, in _poll_once (self=<KafkaConsumer(_metrics=<Metrics(_config=<MetricConfig(event_window=9223372036854775807, _samples=2, quota=None, tags={'client-id': 'soapa_collect_proxy'}, time_window_ms=30000) at remote 0x255fa50>, _children_sensors={<Sensor(_metrics=[<KafkaMetric(_config=<...>, _measurable=<Rate(_stat=<Count(_initial_value=<float at remote 0x157c908>, _samples=[<Sample(event_count=228, last_window_ms=<float at remote 0x7fe30c2a2890>, value=<float at remote 0x19e8cf28>, initial_value=<float at remote 0x157c908>) at remote 0x24be510>, <Sample(event_count=294, last_window_ms=<float at remote 0x7fe30c21a068>, value=<float at remote 0x7fe30c21a1a0>, initial_value=<float at remote 0x157c908>) at remote 0x24cb750>], _current=0) at remote 0x24b7950>, _unit=3) at remote 0x---Type <return> to continue, or q <return> to quit---
24b7990>, _metric_name=<MetricName(_group='consumer-metrics', _description='The average number of network oper...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#61 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=3, kws=0x26838f0, kwcount=1, defs=0x18e6428, defcount=1, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#62 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039bef0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#63 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039bef0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#64 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2683720, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/consumer/group.py, line 645, in poll (self=<KafkaConsumer(_metrics=<Metrics(_config=<MetricConfig(event_window=9223372036854775807, _samples=2, quota=None, tags={'client-id': 'soapa_collect_proxy'}, time_window_ms=30000) at remote 0x255fa50>, _children_sensors={<Sensor(_metrics=[<KafkaMetric(_config=<...>, _measurable=<Rate(_stat=<Count(_initial_value=<float at remote 0x157c908>, _samples=[<Sample(event_count=228, last_window_ms=<float at remote 0x7fe30c2a2890>, value=<float at remote 0x19e8cf28>, initial_value=<float at remote 0x157c908>) at remote 0x24be510>, <Sample(event_count=294, last_window_ms=<float at remote 0x7fe30c21a068>, value=<float at remote 0x7fe30c21a1a0>, initial_value=<float at remote 0x157c908>) at remote 0x24cb750>], _current=0) at remote 0x24b7950>, _unit=3) at remote 0x24b7990>, _metric_name=<MetricName(_group='consumer-metrics', _description='The average number of network operations...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#65 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=3, kws=0x2648ef0, kwcount=0, defs=0x196f568, defcount=3, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#66 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039c100, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#67 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039c100)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#68 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2648d50, for file /matrix/soapa-app/app-work/app-sdk/soapa-xxx-lib/python/utils/kafka_client.py, line 73, in poll (timeout=1000, max_count=1), throwflag=throwflag@entry=0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#69 0x00007fe318c5d4bd in fast_function (nk=<optimized out>, na=<optimized out>, n=2, pp_stack=0x7ffd3039c270, 
    func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4494
#70 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039c270)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#71 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x21d1de0, for file /matrix/soapa-app/app-work/app-sdk/soapa-xxx-lib/python/file_parser/base_parser.py, line 138, in start (self=<Parser(kafka_topic='aflogs-1', s3_client=<S3Client(conn=<S3Connection(request_hook=None, num_retries=6, ca_certificates_file='/matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/boto/cacerts/cacerts.txt', _provider_type='aws', http_unretryable_exceptions=[<type at remote 0x250f250>], https_connection_factory=None, is_secure=False, protocol='http', port=12001, _connection=('eds-1.novalocal', 12001, False), _auth_handler=<HmacAuthV1Handler(_hmac_256=None, host='eds-1.novalocal', _hmac=<HMAC(digest_cons=<built-in funct---Type <return> to continue, or q <return> to quit---
ion openssl_sha1>, outer=<_hashlib.HASH at remote 0x24e2490>, inner=<_hashlib.HASH at remote 0x24e24e0>, digest_size=20L) at remote 0x24b6e18>, _provider=<Provider(acl_header='x-amz-acl', server_side_encryption_header='x-amz-server-side-encryption', copy_source_version_id='x-amz-copy-source-version-id', profile_name=None, mfa_header='x-amz-...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#72 0x00007fe318c5d4bd in fast_function (nk=<optimized out>, na=<optimized out>, n=1, pp_stack=0x7ffd3039c3e0, 
    func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4494
#73 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039c3e0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#74 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x21c9e50, for file /matrix/soapa-app/app-work/app-sdk/soapa-xxx-lib/python/file_parser/main.py, line 38, in each_parse (setting={'kafka_topic': 'aflogs-1', 'MQ': {'group_id': 'collect-aflogs-parser', 'type': 'kafka', 'keyword': 'NGAF_SECEVENT', 'key': 'collect-receive-file-eds'}, 'parser_module': 'file_parser.parser_af_secevent.parser'}, class_module=<module at remote 0x2141d00>, terminate=<function at remote 0x21bda28>), 
    throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#75 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=args@entry=0x7fe31911a068, argcount=0, kws=kws@entry=0x2128548, kwcount=kwcount@entry=3, 
    defs=defs@entry=0x0, defcount=defcount@entry=0, closure=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#76 0x00007fe318be988d in function_call (func=<function at remote 0x213a5f0>, arg=(), 
    kw={'kafka_topic': 'aflogs-1', 'MQ': {'group_id': 'collect-aflogs-parser', 'type': 'kafka', 'keyword': 'NGAF_SECEVENT', 'key': 'collect-receive-file-eds'}, 'parser_module': 'file_parser.parser_af_secevent.parser'})
    at /usr/src/debug/Python-2.7.5/Objects/funcobject.c:526
#77 0x00007fe318bc48e3 in PyObject_Call (func=func@entry=<function at remote 0x213a5f0>, arg=arg@entry=(), 
    kw=kw@entry={'kafka_topic': 'aflogs-1', 'MQ': {'group_id': 'collect-aflogs-parser', 'type': 'kafka', 'keyword': 'NGAF_SECEVENT', 'key': 'collect-receive-file-eds'}, 'parser_module': 'file_parser.parser_af_secevent.parser'})
    at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2529
#78 0x00007fe318c584fd in ext_do_call (nk=<optimized out>, na=<optimized out>, flags=<optimized out>, 
    pp_stack=0x7ffd3039c6b8, func=<function at remote 0x213a5f0>)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4721
#79 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x21c9c80, for file /usr/lib64/python2.7/multiprocessing/process.py, line 114, in run (self=<Process(_counter=<itertools.count at remote 0x213b998>, _daemonic=False, _target=<function at remote 0x213a5f0>, _args=(), _children=set([]), _tempdir=None, _name='Process-1', _authkey=<AuthenticationString at remote 0x17a1de0>, _parent_pid=8, _kwargs={'kafka_topic': 'aflogs-1', 'MQ': {'group_id': 'collect-aflogs-parser', 'type': 'kafka', 'keyword': 'NGAF_SECEVENT', 'key': 'collect-receive-file-eds'}, 'parser_module': 'file_parser.parser_af_secevent.parser'}, _identity=(1,), _popen=None) at remote 0x2137a10>), throwflag=throwflag@entry=0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3079
#80 0x00007fe318c5d4bd in fast_function (nk=<optimized out>, na=<optimized out>, n=1, pp_stack=0x7ffd3039c820, 
    func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4494
#81 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039c820)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#82 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x21c9470, for file /usr/lib64/python2.7/multiprocessing/process.py, line 258, in _bootstrap 
  • And gdb bt in thread 2
(gdb) thread 2
[Switching to thread 2 (Thread 0x7fe308ff9700 (LWP 28))]
#0  0x00007fe31896a79b in futex_abstimed_wait (cancel=true, private=<optimized out>, abstime=0x0, expected=0, 
    futex=0x265b040) at ../nptl/sysdeps/unix/sysv/linux/sem_waitcommon.c:43
43	      err = lll_futex_wait (futex, expected, private);
(gdb) py-bt
#4 Waiting for a lock (e.g. GIL)
#5 Waiting for a lock (e.g. GIL)
#7 Frame 0x7fe30c01c130, for file /usr/lib64/python2.7/threading.py, line 173, in acquire (self=<_RLock(_Verbose__verbose=False, _RLock__owner=140613355153216, _RLock__block=<thread.lock at remote 0x1ac1730>, _RLock__count=1) at remote 0x255fcd0>, blocking=1, me=140613085271808)
    rc = self.__block.acquire(blocking)
#14 Frame 0x7fe304007740, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/client_async.py, line 572, in poll (self=<KafkaClient(_conns=<Dict at remote 0x265a9e0>, _sending=set([]), _idle_expiry_manager=<IdleConnectionManager(connections_max_idle=<float at remote 0x2641b68>, lru_connections=<OrderedDict(_OrderedDict__root=[[[[...], [...], 4], [...], 2], [...], None], _OrderedDict__map={2: [...], 4: [...]}) at remote 0x265b2a0>, next_idle_close_check_time=<float at remote 0x7fe30c11fd38>) at remote 0x255fd10>, _closed=False, _pending_completion=<collections.deque at remote 0x218cc90>, _wake_r=<_socket.socket at remote 0x21aab40>, _lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613355153216, _RLock__block=<thread.lock at remote 0x1ac1730>, _RLock__count=1) at remote 0x255fcd0>, _connecting=set([]), _sensors=<KafkaClientMetrics(metric_group_name='consumer-metrics', select_time=<Sensor(_metrics=[<KafkaMetric(_config=<MetricConfig(event_window=9223372036854775807, _sample...(truncated)
    with self._lock:
#18 Frame 0x7fe304001690, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/coordinator/base.py, line 965, in _run_once (self=<HeartbeatThread(coordinator=<weakproxy at remote 0x24b57e0>, _Thread__ident=140613085271808, failed=None, _Thread__block=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa330>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa330>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa330>) at remote 0x2592d10>, _Thre---Type <return> to continue, or q <return> to quit---
ad__name='collect-aflogs-parser-heartbeat', _Thread__daemonic=True, enabled=True, _Thread__started=<_Event(_Verbose__verbose=False, _Event__flag=True, _Event__cond=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa310>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa310>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa310>) at remote 0x2592c...(truncated)
    self.coordinator._client.poll(timeout_ms=0)
#22 Frame 0x7fe3040014b0, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/coordinator/base.py, line 933, in run (self=<HeartbeatThread(coordinator=<weakproxy at remote 0x24b57e0>, _Thread__ident=140613085271808, failed=None, _Thread__block=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa330>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa330>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa330>) at remote 0x2592d10>, _Thread__name='collect-aflogs-parser-heartbeat', _Thread__daemonic=True, enabled=True, _Thread__started=<_Event(_Verbose__verbose=False, _Event__flag=True, _Event__cond=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa310>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa310>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa310>) at remote 0x2592cd0>) a...(truncated)
    self._run_once()
#26 Frame 0x7fe304001290, for file /usr/lib64/python2.7/threading.py, line 811, in __bootstrap_inner (self=<HeartbeatThread(coordinator=<weakproxy at remote 0x24b57e0>, _Thread__ident=140613085271808, failed=None, _Thread__block=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa330>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa330>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa330>) at remote 0x2592d10>, _Thread__name='collect-aflogs-parser-heartbeat', _Thread__daemonic=True, enabled=True, _Thread__started=<_Event(_Verbose__verbose=False, _Event__flag=True, _Event__cond=<_Conditi---Type <return> to continue, or q <return> to quit---
on(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa310>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa310>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa310>) at remote 0x2592cd0>) at remote 0x25920d0>, _Thread__stderr=<file at remot...(truncated)
    self.run()
#29 Frame 0x7fe3040010c0, for file /usr/lib64/python2.7/threading.py, line 784, in __bootstrap (self=<HeartbeatThread(coordinator=<weakproxy at remote 0x24b57e0>, _Thread__ident=140613085271808, failed=None, _Thread__block=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa330>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa330>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa330>) at remote 0x2592d10>, _Thread__name='collect-aflogs-parser-heartbeat', _Thread__daemonic=True, enabled=True, _Thread__started=<_Event(_Verbose__verbose=False, _Event__flag=True, _Event__cond=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa310>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa310>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa310>) at remote 0x2592cd0>) at remote 0x25920d0>, _Thread__stderr=<file at remote 0x7f...(truncated)
    self.__bootstrap_inner()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions