1
+ from __future__ import annotations
2
+
3
+ import logging
1
4
import os
2
5
import sqlite3
3
6
import threading
4
- import logging
5
7
import time
6
- from _error import Timeout
7
- from filelock ._api import AcquireReturnProxy , BaseFileLock
8
- from typing import Literal , Any
9
8
from contextlib import contextmanager
9
+ from typing import Any , Literal
10
10
from weakref import WeakValueDictionary
11
11
12
+ from _error import Timeout
13
+
14
+ from filelock ._api import AcquireReturnProxy
15
+
12
16
_LOGGER = logging .getLogger ("filelock" )
13
17
14
18
# PRAGMA busy_timeout=N delegates to https://www.sqlite.org/c3ref/busy_timeout.html,
15
19
# which accepts an int argument, which has the maximum value of 2_147_483_647 on 32-bit
16
20
# systems. Use even a lower value to be safe. This 2 bln milliseconds is about 23 days.
17
21
_MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1
18
22
23
+
19
24
def timeout_for_sqlite (timeout : float , blocking : bool , already_waited : float ) -> int :
20
25
if blocking is False :
21
26
return 0
22
-
27
+
23
28
if timeout == - 1 :
24
29
return _MAX_SQLITE_TIMEOUT_MS
25
-
30
+
26
31
if timeout < 0 :
27
- raise ValueError ("timeout must be a non-negative number or -1" )
28
-
32
+ msg = "timeout must be a non-negative number or -1"
33
+ raise ValueError (msg )
34
+
29
35
if timeout > 0 :
30
- timeout = timeout - already_waited
31
- if timeout < 0 :
32
- timeout = 0
33
-
36
+ timeout -= already_waited
37
+ timeout = max (timeout , 0 )
38
+
34
39
assert timeout >= 0
35
40
36
41
timeout_ms = int (timeout * 1000 )
@@ -42,9 +47,16 @@ def timeout_for_sqlite(timeout: float, blocking: bool, already_waited: float) ->
42
47
43
48
class _ReadWriteLockMeta (type ):
44
49
"""Metaclass that redirects instance creation to get_lock() when is_singleton=True."""
45
- def __call__ (cls , lock_file : str | os .PathLike [str ],
46
- timeout : float = - 1 , blocking : bool = True ,
47
- is_singleton : bool = True , * args : Any , ** kwargs : Any ) -> "ReadWriteLock" :
50
+
51
+ def __call__ (
52
+ cls ,
53
+ lock_file : str | os .PathLike [str ],
54
+ timeout : float = - 1 ,
55
+ blocking : bool = True ,
56
+ is_singleton : bool = True ,
57
+ * args : Any ,
58
+ ** kwargs : Any ,
59
+ ) -> ReadWriteLock :
48
60
if is_singleton :
49
61
return cls .get_lock (lock_file , timeout , blocking )
50
62
return super ().__call__ (lock_file , timeout , blocking , is_singleton , * args , ** kwargs )
@@ -56,16 +68,22 @@ class ReadWriteLock(metaclass=_ReadWriteLockMeta):
56
68
_instances_lock = threading .Lock ()
57
69
58
70
@classmethod
59
- def get_lock (cls , lock_file : str | os .PathLike [str ],
60
- timeout : float = - 1 , blocking : bool = True ) -> "ReadWriteLock" :
71
+ def get_lock (cls , lock_file : str | os .PathLike [str ], timeout : float = - 1 , blocking : bool = True ) -> ReadWriteLock :
61
72
"""Return the one-and-only ReadWriteLock for a given file."""
62
73
normalized = os .path .abspath (lock_file )
63
74
with cls ._instances_lock :
64
75
if normalized not in cls ._instances :
65
76
cls ._instances [normalized ] = cls (lock_file , timeout , blocking )
66
77
instance = cls ._instances [normalized ]
67
78
if instance .timeout != timeout or instance .blocking != blocking :
68
- raise ValueError ("Singleton lock created with timeout=%s, blocking=%s, cannot be changed to timeout=%s, blocking=%s" , instance .timeout , instance .blocking , timeout , blocking )
79
+ msg = "Singleton lock created with timeout=%s, blocking=%s, cannot be changed to timeout=%s, blocking=%s"
80
+ raise ValueError (
81
+ msg ,
82
+ instance .timeout ,
83
+ instance .blocking ,
84
+ timeout ,
85
+ blocking ,
86
+ )
69
87
return instance
70
88
71
89
def __init__ (
@@ -85,7 +103,7 @@ def __init__(
85
103
self ._internal_lock = threading .Lock ()
86
104
self ._lock_level = 0 # Reentrance counter.
87
105
# _current_mode holds the active lock mode ("read" or "write") or None if no lock is held.
88
- self ._current_mode : Literal ["read" , "write" , None ] = None
106
+ self ._current_mode : Literal ["read" , "write" ] | None = None
89
107
# _lock_level is the reentrance counter.
90
108
self ._lock_level = 0
91
109
self .con = sqlite3 .connect (self .lock_file , check_same_thread = False )
@@ -101,21 +119,25 @@ def __init__(
101
119
# acquire, so crashes cannot adversely affect the DB. Even journal_mode=OFF would probably
102
120
# be fine, too, but the SQLite documentation says that ROLLBACK becomes *undefined behaviour*
103
121
# with journal_mode=OFF which sounds scarier.
104
- self .con .execute (' PRAGMA journal_mode=MEMORY;' )
122
+ self .con .execute (" PRAGMA journal_mode=MEMORY;" )
105
123
106
124
def acquire_read (self , timeout : float = - 1 , blocking : bool = True ) -> AcquireReturnProxy :
107
- """Acquire a read lock. If a lock is already held, it must be a read lock.
108
- Upgrading from read to write is prohibited."""
109
-
125
+ """
126
+ Acquire a read lock. If a lock is already held, it must be a read lock.
127
+ Upgrading from read to write is prohibited.
128
+ """
110
129
# Attempt to re-enter already held lock.
111
130
with self ._internal_lock :
112
131
if self ._lock_level > 0 :
113
132
# Must already be in read mode.
114
133
if self ._current_mode != "read" :
115
- raise RuntimeError (
116
- f"Cannot acquire read lock on { self .lock_file } (lock id: { id (self )} ): "
117
- "already holding a write lock (downgrade not allowed)"
118
- )
134
+ msg = (
135
+ f"Cannot acquire read lock on { self .lock_file } (lock id: { id (self )} ): "
136
+ "already holding a write lock (downgrade not allowed)"
137
+ )
138
+ raise RuntimeError (
139
+ msg
140
+ )
119
141
self ._lock_level += 1
120
142
return AcquireReturnProxy (lock = self )
121
143
@@ -131,47 +153,54 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet
131
153
with self ._internal_lock :
132
154
if self ._lock_level > 0 :
133
155
if self ._current_mode != "read" :
134
- raise RuntimeError (
156
+ msg = (
135
157
f"Cannot acquire read lock on { self .lock_file } (lock id: { id (self )} ): "
136
158
"already holding a write lock (downgrade not allowed)"
137
159
)
160
+ raise RuntimeError (
161
+ msg
162
+ )
138
163
self ._lock_level += 1
139
164
return AcquireReturnProxy (lock = self )
140
-
165
+
141
166
waited = time .perf_counter () - start_time
142
167
timeout_ms = timeout_for_sqlite (timeout , blocking , waited )
143
-
144
- self .con .execute (' PRAGMA busy_timeout=?;' , (timeout_ms ,))
145
- self .con .execute (' BEGIN TRANSACTION;' )
168
+
169
+ self .con .execute (" PRAGMA busy_timeout=?;" , (timeout_ms ,))
170
+ self .con .execute (" BEGIN TRANSACTION;" )
146
171
# Need to make SELECT to compel SQLite to actually acquire a SHARED db lock.
147
172
# See https://www.sqlite.org/lockingv3.html#transaction_control
148
- self .con .execute (' SELECT name from sqlite_schema LIMIT 1;' )
173
+ self .con .execute (" SELECT name from sqlite_schema LIMIT 1;" )
149
174
150
175
with self ._internal_lock :
151
176
self ._current_mode = "read"
152
177
self ._lock_level = 1
153
-
178
+
154
179
return AcquireReturnProxy (lock = self )
155
180
156
181
except sqlite3 .OperationalError as e :
157
- if ' database is locked' not in str (e ):
182
+ if " database is locked" not in str (e ):
158
183
raise # Re-raise unexpected errors.
159
184
raise Timeout (self .lock_file )
160
185
finally :
161
186
self ._transaction_lock .release ()
162
187
163
188
def acquire_write (self , timeout : float = - 1 , blocking : bool = True ) -> AcquireReturnProxy :
164
- """Acquire a write lock. If a lock is already held, it must be a write lock.
165
- Upgrading from read to write is prohibited."""
166
-
189
+ """
190
+ Acquire a write lock. If a lock is already held, it must be a write lock.
191
+ Upgrading from read to write is prohibited.
192
+ """
167
193
# Attempt to re-enter already held lock.
168
194
with self ._internal_lock :
169
195
if self ._lock_level > 0 :
170
196
if self ._current_mode != "write" :
171
- raise RuntimeError (
197
+ msg = (
172
198
f"Cannot acquire write lock on { self .lock_file } (lock id: { id (self )} ): "
173
199
"already holding a read lock (upgrade not allowed)"
174
200
)
201
+ raise RuntimeError (
202
+ msg
203
+ )
175
204
self ._lock_level += 1
176
205
return AcquireReturnProxy (lock = self )
177
206
@@ -185,27 +214,30 @@ def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireRe
185
214
with self ._internal_lock :
186
215
if self ._lock_level > 0 :
187
216
if self ._current_mode != "write" :
188
- raise RuntimeError (
217
+ msg = (
189
218
f"Cannot acquire write lock on { self .lock_file } (lock id: { id (self )} ): "
190
219
"already holding a read lock (upgrade not allowed)"
191
220
)
221
+ raise RuntimeError (
222
+ msg
223
+ )
192
224
self ._lock_level += 1
193
225
return AcquireReturnProxy (lock = self )
194
-
226
+
195
227
waited = time .perf_counter () - start_time
196
228
timeout_ms = timeout_for_sqlite (timeout , blocking , waited )
197
-
198
- self .con .execute (' PRAGMA busy_timeout=?;' , (timeout_ms ,))
199
- self .con .execute (' BEGIN EXCLUSIVE TRANSACTION;' )
229
+
230
+ self .con .execute (" PRAGMA busy_timeout=?;" , (timeout_ms ,))
231
+ self .con .execute (" BEGIN EXCLUSIVE TRANSACTION;" )
200
232
201
233
with self ._internal_lock :
202
234
self ._current_mode = "write"
203
235
self ._lock_level = 1
204
-
236
+
205
237
return AcquireReturnProxy (lock = self )
206
238
207
239
except sqlite3 .OperationalError as e :
208
- if ' database is locked' not in str (e ):
240
+ if " database is locked" not in str (e ):
209
241
raise # Re-raise if it is an unexpected error.
210
242
raise Timeout (self .lock_file )
211
243
finally :
@@ -216,7 +248,8 @@ def release(self, force: bool = False) -> None:
216
248
if self ._lock_level == 0 :
217
249
if force :
218
250
return
219
- raise RuntimeError (f"Cannot release a lock on { self .lock_file } (lock id: { id (self )} ) that is not held" )
251
+ msg = f"Cannot release a lock on { self .lock_file } (lock id: { id (self )} ) that is not held"
252
+ raise RuntimeError (msg )
220
253
if force :
221
254
self ._lock_level = 0
222
255
else :
@@ -233,10 +266,11 @@ def release(self, force: bool = False) -> None:
233
266
# (We provide two context managers as helpers.)
234
267
235
268
@contextmanager
236
- def read_lock (self , timeout : float | None = None ,
237
- blocking : bool | None = None ):
238
- """Context manager for acquiring a read lock.
239
- Attempts to upgrade to write lock are disallowed."""
269
+ def read_lock (self , timeout : float | None = None , blocking : bool | None = None ):
270
+ """
271
+ Context manager for acquiring a read lock.
272
+ Attempts to upgrade to write lock are disallowed.
273
+ """
240
274
if timeout is None :
241
275
timeout = self .timeout
242
276
if blocking is None :
@@ -248,10 +282,11 @@ def read_lock(self, timeout: float | None = None,
248
282
self .release ()
249
283
250
284
@contextmanager
251
- def write_lock (self , timeout : float | None = None ,
252
- blocking : bool | None = None ):
253
- """Context manager for acquiring a write lock.
254
- Acquiring read locks on the same file while helding a write lock is prohibited."""
285
+ def write_lock (self , timeout : float | None = None , blocking : bool | None = None ):
286
+ """
287
+ Context manager for acquiring a write lock.
288
+ Acquiring read locks on the same file while helding a write lock is prohibited.
289
+ """
255
290
if timeout is None :
256
291
timeout = self .timeout
257
292
if blocking is None :
@@ -261,9 +296,7 @@ def write_lock(self, timeout: float | None = None,
261
296
yield
262
297
finally :
263
298
self .release ()
264
-
299
+
265
300
def __del__ (self ) -> None :
266
301
"""Called when the lock object is deleted."""
267
302
self .release (force = True )
268
-
269
-
0 commit comments