@@ -193,18 +193,43 @@ def get_attribute_payload_data(
193
193
194
194
return properties
195
195
196
- # Function to send a POST API request
196
+
197
+ # Global variables for event loop management
198
+ # `event_loop_initialized` tracks whether the event loop has been initialized.
199
+ # `main_event_loop` stores the reference to the main event loop that handles async tasks.
200
+ # `loop_lock` ensures that only one thread can initialize or use the event loop at a time.
201
+ event_loop_initialized = False
202
+ main_event_loop = None
203
+ loop_lock = threading .Lock ()
204
+
205
+ # Function to send a POST API request without waiting for the response
197
206
def send_post_api_request (properties : Dict [str , Any ], payload : Dict [str , Any ]):
198
- url = f"{ Constants .HTTPS_PROTOCOL } ://{ UrlService .get_base_url ()} { UrlEnum .EVENTS .value } "
207
+ global event_loop_initialized , main_event_loop
208
+
209
+ # Importing the SettingsManager here to avoid circular import issues or unnecessary imports
199
210
from ..services .settings_manager import SettingsManager
200
211
201
- headers = {
202
- HeadersEnum .USER_AGENT .value : payload ['d' ].get ('visitor_ua' , '' ),
203
- HeadersEnum .IP .value : payload ['d' ].get ('visitor_ip' , '' )
204
- }
212
+ # Initialize the headers dictionary for the request
213
+ headers = {}
214
+
215
+ # Retrieve 'visitor_ua' and 'visitor_ip' from the payload if they exist
216
+ # Strip any whitespace and ensure they are valid strings before adding to headers
217
+ visitor_ua = payload ['d' ].get ('visitor_ua' )
218
+ visitor_ip = payload ['d' ].get ('visitor_ip' )
219
+
220
+ # Add 'visitor_ua' to headers if it's a valid, non-empty string after stripping whitespace
221
+ if visitor_ua and isinstance (visitor_ua , str ) and visitor_ua .strip ():
222
+ headers [HeadersEnum .USER_AGENT .value ] = visitor_ua .strip ()
223
+
224
+ # Add 'visitor_ip' to headers if it's a valid, non-empty string after stripping whitespace
225
+ if visitor_ip and isinstance (visitor_ip , str ) and visitor_ip .strip ():
226
+ headers [HeadersEnum .IP .value ] = visitor_ip .strip ()
205
227
206
228
try :
229
+ # Get the instance of NetworkManager that handles making network requests
207
230
network_instance = NetworkManager .get_instance ()
231
+
232
+ # Create a RequestModel object that holds all the necessary data for the POST request
208
233
request = RequestModel (
209
234
UrlService .get_base_url (),
210
235
'POST' ,
@@ -216,40 +241,37 @@ def send_post_api_request(properties: Dict[str, Any], payload: Dict[str, Any]):
216
241
SettingsManager .get_instance ().port
217
242
)
218
243
219
- # Attempt to get the running loop
220
- try :
221
- loop = asyncio .get_event_loop ()
222
- except RuntimeError :
223
- loop = None
224
-
225
- if loop and loop .is_running ():
226
- # If an event loop is running, schedule the task safely
227
- asyncio .run_coroutine_threadsafe (network_instance .post_async (request ), loop )
228
- else :
229
- # If no event loop is running, run it in a new detached thread
230
- run_in_thread (network_instance .post_async (request ))
231
-
232
- return
233
-
244
+ # Lock the event loop initialization to prevent race conditions in multi-threaded environments
245
+ with loop_lock :
246
+ # Check if the event loop is already initialized and running
247
+ if event_loop_initialized and main_event_loop .is_running ():
248
+ # If the loop is running, submit the asynchronous POST request to the loop
249
+ # This will not block the main thread
250
+ asyncio .run_coroutine_threadsafe (network_instance .post_async (request ), main_event_loop )
251
+ else :
252
+ # If the event loop has not been initialized or is not running:
253
+ # 1. Mark the event loop as initialized
254
+ # 2. Create a new event loop
255
+ # 3. Start the event loop in a separate thread so it doesn't block the main thread
256
+ event_loop_initialized = True
257
+ main_event_loop = asyncio .new_event_loop ()
258
+ threading .Thread (target = start_event_loop , args = (main_event_loop ,), daemon = True ).start ()
259
+
260
+ # Submit the asynchronous POST request to the newly started event loop
261
+ asyncio .run_coroutine_threadsafe (network_instance .post_async (request ), main_event_loop )
262
+
234
263
except Exception as err :
235
264
LogManager .get_instance ().error (
236
265
error_messages .get ('NETWORK_CALL_FAILED' ).format (
237
- method = 'POST' ,
238
- err = err ,
266
+ method = 'POST' ,
267
+ err = err ,
239
268
),
240
269
)
241
270
242
- # Function to run an asyncio event loop in a separate thread
243
- def run_in_thread (coro ):
244
- def run ():
245
- loop = asyncio .new_event_loop ()
246
- asyncio .set_event_loop (loop )
247
- try :
248
- loop .run_until_complete (coro )
249
- finally :
250
- loop .run_until_complete (loop .shutdown_asyncgens ())
251
- loop .close ()
252
-
253
- thread = threading .Thread (target = run )
254
- thread .daemon = True # Ensure the thread does not block program exit
255
- thread .start ()
271
+ # Function to start the event loop in a new thread
272
+ def start_event_loop (loop ):
273
+ # Set the provided loop as the current event loop for the new thread
274
+ asyncio .set_event_loop (loop )
275
+
276
+ # Run the event loop indefinitely to handle any submitted asynchronous tasks
277
+ loop .run_forever ()
0 commit comments