Skip to content

Commit 633cfe0

Browse files
seyoon-limshaun.glass
authored andcommitted
Fix spark driver integration (getsentry#3162)
Changed the calling position of the `spark_context_init` func to ensure that SparkIntegration is used prior to the creation of the Spark session. --------- Co-authored-by: shaun.glass <[email protected]>
1 parent 41c0aff commit 633cfe0

File tree

2 files changed

+46
-24
lines changed

2 files changed

+46
-24
lines changed

sentry_sdk/integrations/spark/spark_driver.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def patch_spark_context_init():
5959
@ensure_integration_enabled(SparkIntegration, spark_context_init)
6060
def _sentry_patched_spark_context_init(self, *args, **kwargs):
6161
# type: (SparkContext, *Any, **Any) -> Optional[Any]
62+
rv = spark_context_init(self, *args, **kwargs)
6263
_start_sentry_listener(self)
6364
_set_app_properties()
6465

@@ -71,6 +72,9 @@ def process_event(event, hint):
7172
if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
7273
return event
7374

75+
if self._active_spark_context is None:
76+
return event
77+
7478
event.setdefault("user", {}).setdefault("id", self.sparkUser())
7579

7680
event.setdefault("tags", {}).setdefault(
@@ -96,7 +100,7 @@ def process_event(event, hint):
96100

97101
return event
98102

99-
return spark_context_init(self, *args, **kwargs)
103+
return rv
100104

101105
SparkContext._do_init = _sentry_patched_spark_context_init
102106

tests/integrations/spark/test_spark.py

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import pytest
22
import sys
3+
from unittest.mock import patch
34
from sentry_sdk.integrations.spark.spark_driver import (
45
_set_app_properties,
56
_start_sentry_listener,
67
SentryListener,
8+
SparkIntegration,
79
)
8-
910
from sentry_sdk.integrations.spark.spark_worker import SparkWorkerIntegration
1011

1112
from pyspark import SparkContext
@@ -40,27 +41,27 @@ def test_start_sentry_listener():
4041
assert gateway._callback_server is not None
4142

4243

43-
@pytest.fixture
44-
def sentry_listener(monkeypatch):
45-
class MockHub:
46-
def __init__(self):
47-
self.args = []
48-
self.kwargs = {}
44+
def test_initialize_spark_integration(sentry_init):
45+
sentry_init(integrations=[SparkIntegration()])
46+
SparkContext.getOrCreate()
47+
4948

50-
def add_breadcrumb(self, *args, **kwargs):
51-
self.args = args
52-
self.kwargs = kwargs
49+
@pytest.fixture
50+
def sentry_listener():
5351

5452
listener = SentryListener()
55-
mock_hub = MockHub()
5653

57-
monkeypatch.setattr(listener, "hub", mock_hub)
54+
return listener
55+
5856

59-
return listener, mock_hub
57+
@pytest.fixture
58+
def mock_add_breadcrumb():
59+
with patch("sentry_sdk.add_breadcrumb") as mock:
60+
yield mock
6061

6162

62-
def test_sentry_listener_on_job_start(sentry_listener):
63-
listener, mock_hub = sentry_listener
63+
def test_sentry_listener_on_job_start(sentry_listener, mock_add_breadcrumb):
64+
listener = sentry_listener
6465

6566
class MockJobStart:
6667
def jobId(self): # noqa: N802
@@ -69,15 +70,20 @@ def jobId(self): # noqa: N802
6970
mock_job_start = MockJobStart()
7071
listener.onJobStart(mock_job_start)
7172

73+
mock_add_breadcrumb.assert_called_once()
74+
mock_hub = mock_add_breadcrumb.call_args
75+
7276
assert mock_hub.kwargs["level"] == "info"
7377
assert "sample-job-id-start" in mock_hub.kwargs["message"]
7478

7579

7680
@pytest.mark.parametrize(
7781
"job_result, level", [("JobSucceeded", "info"), ("JobFailed", "warning")]
7882
)
79-
def test_sentry_listener_on_job_end(sentry_listener, job_result, level):
80-
listener, mock_hub = sentry_listener
83+
def test_sentry_listener_on_job_end(
84+
sentry_listener, mock_add_breadcrumb, job_result, level
85+
):
86+
listener = sentry_listener
8187

8288
class MockJobResult:
8389
def toString(self): # noqa: N802
@@ -94,13 +100,16 @@ def jobResult(self): # noqa: N802
94100
mock_job_end = MockJobEnd()
95101
listener.onJobEnd(mock_job_end)
96102

103+
mock_add_breadcrumb.assert_called_once()
104+
mock_hub = mock_add_breadcrumb.call_args
105+
97106
assert mock_hub.kwargs["level"] == level
98107
assert mock_hub.kwargs["data"]["result"] == job_result
99108
assert "sample-job-id-end" in mock_hub.kwargs["message"]
100109

101110

102-
def test_sentry_listener_on_stage_submitted(sentry_listener):
103-
listener, mock_hub = sentry_listener
111+
def test_sentry_listener_on_stage_submitted(sentry_listener, mock_add_breadcrumb):
112+
listener = sentry_listener
104113

105114
class StageInfo:
106115
def stageId(self): # noqa: N802
@@ -120,6 +129,9 @@ def stageInfo(self): # noqa: N802
120129
mock_stage_submitted = MockStageSubmitted()
121130
listener.onStageSubmitted(mock_stage_submitted)
122131

132+
mock_add_breadcrumb.assert_called_once()
133+
mock_hub = mock_add_breadcrumb.call_args
134+
123135
assert mock_hub.kwargs["level"] == "info"
124136
assert "sample-stage-id-submit" in mock_hub.kwargs["message"]
125137
assert mock_hub.kwargs["data"]["attemptId"] == 14
@@ -163,13 +175,16 @@ def stageInfo(self): # noqa: N802
163175

164176

165177
def test_sentry_listener_on_stage_completed_success(
166-
sentry_listener, get_mock_stage_completed
178+
sentry_listener, mock_add_breadcrumb, get_mock_stage_completed
167179
):
168-
listener, mock_hub = sentry_listener
180+
listener = sentry_listener
169181

170182
mock_stage_completed = get_mock_stage_completed(failure_reason=False)
171183
listener.onStageCompleted(mock_stage_completed)
172184

185+
mock_add_breadcrumb.assert_called_once()
186+
mock_hub = mock_add_breadcrumb.call_args
187+
173188
assert mock_hub.kwargs["level"] == "info"
174189
assert "sample-stage-id-submit" in mock_hub.kwargs["message"]
175190
assert mock_hub.kwargs["data"]["attemptId"] == 14
@@ -178,13 +193,16 @@ def test_sentry_listener_on_stage_completed_success(
178193

179194

180195
def test_sentry_listener_on_stage_completed_failure(
181-
sentry_listener, get_mock_stage_completed
196+
sentry_listener, mock_add_breadcrumb, get_mock_stage_completed
182197
):
183-
listener, mock_hub = sentry_listener
198+
listener = sentry_listener
184199

185200
mock_stage_completed = get_mock_stage_completed(failure_reason=True)
186201
listener.onStageCompleted(mock_stage_completed)
187202

203+
mock_add_breadcrumb.assert_called_once()
204+
mock_hub = mock_add_breadcrumb.call_args
205+
188206
assert mock_hub.kwargs["level"] == "warning"
189207
assert "sample-stage-id-submit" in mock_hub.kwargs["message"]
190208
assert mock_hub.kwargs["data"]["attemptId"] == 14

0 commit comments

Comments
 (0)