Skip to content

Commit be073b3

Browse files
committed
Copy the content of sharding_key.lua file to sharding_metadata.lua file
PR #181 introduced support of DDL sharding keys. Implementation of sharding keys support contains methods that are common to support sharding keys and sharding functions. That's why a separate file `sharding_metadata.lua` was created to contain common methods. In this commit content of `sharding_key.lua` file is coppied to `sharding_metadata.lua` file to simplify a reviewer's life and display the history of changes relative to PR #181 in the following commits. Part of #237
1 parent f6d00a9 commit be073b3

File tree

11 files changed

+261
-19
lines changed

11 files changed

+261
-19
lines changed

crud.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ local select = require('crud.select')
1212
local truncate = require('crud.truncate')
1313
local len = require('crud.len')
1414
local borders = require('crud.borders')
15-
local sharding_key = require('crud.common.sharding_key')
15+
local sharding_metadata = require('crud.common.sharding.sharding_metadata')
1616
local utils = require('crud.common.utils')
1717

1818
local crud = {}
@@ -114,7 +114,7 @@ function crud.init_storage()
114114
truncate.init()
115115
len.init()
116116
borders.init()
117-
sharding_key.init()
117+
sharding_metadata.init()
118118
end
119119

120120
function crud.init_router()

crud/common/sharding/init.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ local errors = require('errors')
44
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
55

66
local utils = require('crud.common.utils')
7-
local sharding_key_module = require('crud.common.sharding_key')
7+
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
88

99
local sharding = {}
1010

@@ -22,7 +22,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
2222
end
2323

2424
local sharding_index_parts = space.index[0].parts
25-
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space.name)
25+
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space.name)
2626
if err ~= nil then
2727
return nil, err
2828
end
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
local fiber = require('fiber')
2+
local errors = require('errors')
3+
4+
local call = require('crud.common.call')
5+
local const = require('crud.common.const')
6+
local dev_checks = require('crud.common.dev_checks')
7+
local cache = require('crud.common.sharding_key_cache')
8+
local utils = require('crud.common.utils')
9+
10+
local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false})
11+
local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false})
12+
local WrongShardingConfigurationError = errors.new_class('WrongShardingConfigurationError', {capture_stack = false})
13+
14+
local FETCH_FUNC_NAME = '_crud.fetch_on_storage'
15+
16+
local sharding_key_module = {}
17+
18+
-- Function decorator that is used to prevent _fetch_on_router() from being
19+
-- called concurrently by different fibers.
20+
local function locked(f)
21+
dev_checks('function')
22+
23+
return function(timeout, ...)
24+
local timeout_deadline = fiber.clock() + timeout
25+
local ok = cache.fetch_lock:put(true, timeout)
26+
-- channel:put() returns false in two cases: when timeout is exceeded
27+
-- or channel has been closed. However error message describes only
28+
-- first reason, I'm not sure we need to disclose to users such details
29+
-- like problems with synchronization objects.
30+
if not ok then
31+
return FetchShardingKeyError:new(
32+
"Timeout for fetching sharding key is exceeded")
33+
end
34+
local timeout = timeout_deadline - fiber.clock()
35+
local status, err = pcall(f, timeout, ...)
36+
cache.fetch_lock:get()
37+
if not status or err ~= nil then
38+
return err
39+
end
40+
end
41+
end
42+
43+
-- Build a structure similar to index, but it is not a real index object,
44+
-- it contains only parts key with fieldno's.
45+
local function as_index_object(space_name, space_format, sharding_key_def)
46+
dev_checks('string', 'table', 'table')
47+
48+
local fieldnos = {}
49+
local fieldno_map = utils.get_format_fieldno_map(space_format)
50+
for _, field_name in ipairs(sharding_key_def) do
51+
local fieldno = fieldno_map[field_name]
52+
if fieldno == nil then
53+
return nil, WrongShardingConfigurationError:new(
54+
"No such field (%s) in a space format (%s)", field_name, space_name)
55+
end
56+
table.insert(fieldnos, {fieldno = fieldno})
57+
end
58+
59+
return {parts = fieldnos}
60+
end
61+
62+
-- Return a map with metadata or nil when space box.space._ddl_sharding_key is
63+
-- not available on storage.
64+
function sharding_key_module.fetch_on_storage()
65+
local sharding_key_space = box.space._ddl_sharding_key
66+
if sharding_key_space == nil then
67+
return nil
68+
end
69+
70+
local SPACE_NAME_FIELDNO = 1
71+
local SPACE_SHARDING_KEY_FIELDNO = 2
72+
local metadata_map = {}
73+
for _, tuple in sharding_key_space:pairs() do
74+
local space_name = tuple[SPACE_NAME_FIELDNO]
75+
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
76+
local space_format = box.space[space_name]:format()
77+
metadata_map[space_name] = {
78+
sharding_key_def = sharding_key_def,
79+
space_format = space_format,
80+
}
81+
end
82+
83+
return metadata_map
84+
end
85+
86+
-- Under high load we may get a case when more than one fiber will fetch
87+
-- metadata from storages. It is not good from performance point of view.
88+
-- locked() wraps a _fetch_on_router() to limit a number of fibers that fetches
89+
-- a sharding metadata by a single one, other fibers will wait while
90+
-- cache.fetch_lock become unlocked during timeout passed to
91+
-- _fetch_on_router().
92+
local _fetch_on_router = locked(function(timeout)
93+
dev_checks('number')
94+
95+
if cache.sharding_key_as_index_obj_map ~= nil then
96+
return
97+
end
98+
99+
local metadata_map, err = call.any(FETCH_FUNC_NAME, {}, {
100+
timeout = timeout
101+
})
102+
if err ~= nil then
103+
return err
104+
end
105+
if metadata_map == nil then
106+
cache.sharding_key_as_index_obj_map = {}
107+
return
108+
end
109+
110+
cache.sharding_key_as_index_obj_map = {}
111+
for space_name, metadata in pairs(metadata_map) do
112+
local sharding_key_as_index_obj, err = as_index_object(space_name,
113+
metadata.space_format,
114+
metadata.sharding_key_def)
115+
if err ~= nil then
116+
return err
117+
end
118+
cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj
119+
end
120+
end)
121+
122+
-- Get sharding index for a certain space.
123+
--
124+
-- Return:
125+
-- - sharding key as index object, when sharding key definition found on
126+
-- storage.
127+
-- - nil, when sharding key definition was not found on storage. Pay attention
128+
-- that nil without error is a successfull return value.
129+
-- - nil and error, when something goes wrong on fetching attempt.
130+
--
131+
function sharding_key_module.fetch_on_router(space_name, timeout)
132+
dev_checks('string', '?number')
133+
134+
if cache.sharding_key_as_index_obj_map ~= nil then
135+
return cache.sharding_key_as_index_obj_map[space_name]
136+
end
137+
138+
local timeout = timeout or const.FETCH_SHARDING_KEY_TIMEOUT
139+
local err = _fetch_on_router(timeout)
140+
if err ~= nil then
141+
if cache.sharding_key_as_index_obj_map ~= nil then
142+
return cache.sharding_key_as_index_obj_map[space_name]
143+
end
144+
return nil, err
145+
end
146+
147+
if cache.sharding_key_as_index_obj_map ~= nil then
148+
return cache.sharding_key_as_index_obj_map[space_name]
149+
end
150+
151+
return nil, FetchShardingKeyError:new(
152+
"Fetching sharding key for space '%s' is failed", space_name)
153+
end
154+
155+
function sharding_key_module.update_cache(space_name)
156+
cache.drop_caches()
157+
return sharding_key_module.fetch_on_router(space_name)
158+
end
159+
160+
-- Make sure sharding key definition is a part of primary key.
161+
local function is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj)
162+
dev_checks('string', 'table', 'table')
163+
164+
if cache.is_part_of_pk[space_name] ~= nil then
165+
return cache.is_part_of_pk[space_name]
166+
end
167+
168+
local is_part_of_pk = true
169+
local pk_fieldno_map = utils.get_index_fieldno_map(primary_index_parts)
170+
for _, part in ipairs(sharding_key_as_index_obj.parts) do
171+
if pk_fieldno_map[part.fieldno] == nil then
172+
is_part_of_pk = false
173+
break
174+
end
175+
end
176+
cache.is_part_of_pk[space_name] = is_part_of_pk
177+
178+
return is_part_of_pk
179+
end
180+
181+
-- Build an array with sharding key values. Function extracts those values from
182+
-- primary key that are part of sharding key (passed as index object).
183+
local function extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj)
184+
dev_checks('table', 'table', 'table')
185+
186+
-- TODO: extract_from_index() calculates primary_index_parts on each
187+
-- request. It is better to cache it's value.
188+
-- https://github.com/tarantool/crud/issues/243
189+
local primary_index_fieldno_map = utils.get_index_fieldno_map(primary_index_parts)
190+
191+
local sharding_key = {}
192+
for _, part in ipairs(sharding_key_as_index_obj.parts) do
193+
-- part_number cannot be nil because earlier we checked that tuple
194+
-- field names defined in sharding key definition are part of primary
195+
-- key.
196+
local part_number = primary_index_fieldno_map[part.fieldno]
197+
assert(part_number ~= nil)
198+
local field_value = primary_key[part_number]
199+
table.insert(sharding_key, field_value)
200+
end
201+
202+
return sharding_key
203+
end
204+
205+
-- Extract sharding key from pk.
206+
-- Returns a table with sharding key or pair of nil and error.
207+
function sharding_key_module.extract_from_pk(space_name, primary_index_parts, primary_key, timeout)
208+
dev_checks('string', 'table', '?', '?number')
209+
210+
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name, timeout)
211+
if err ~= nil then
212+
return nil, err
213+
end
214+
if sharding_key_as_index_obj == nil then
215+
return primary_key
216+
end
217+
218+
local res = is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj)
219+
if res == false then
220+
return nil, ShardingKeyError:new(
221+
"Sharding key for space %q is missed in primary index, specify bucket_id",
222+
space_name
223+
)
224+
end
225+
if type(primary_key) ~= 'table' then
226+
primary_key = {primary_key}
227+
end
228+
229+
return extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj)
230+
end
231+
232+
function sharding_key_module.init()
233+
_G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage
234+
end
235+
236+
sharding_key_module.internal = {
237+
as_index_object = as_index_object,
238+
extract_from_index = extract_from_index,
239+
is_part_of_pk = is_part_of_pk,
240+
}
241+
242+
return sharding_key_module

crud/delete.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding.init')
8-
local sharding_key_module = require('crud.common.sharding_key')
8+
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
99
local dev_checks = require('crud.common.dev_checks')
1010
local schema = require('crud.common.schema')
1111

@@ -60,7 +60,7 @@ local function call_delete_on_router(space_name, key, opts)
6060
if opts.bucket_id == nil then
6161
local err
6262
local primary_index_parts = space.index[0].parts
63-
sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)
63+
sharding_key, err = sharding_metadata_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)
6464
if err ~= nil then
6565
return nil, err
6666
end

crud/get.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding.init')
8-
local sharding_key_module = require('crud.common.sharding_key')
8+
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
99
local dev_checks = require('crud.common.dev_checks')
1010
local schema = require('crud.common.schema')
1111

@@ -63,7 +63,7 @@ local function call_get_on_router(space_name, key, opts)
6363
if opts.bucket_id == nil then
6464
local err
6565
local primary_index_parts = space.index[0].parts
66-
sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)
66+
sharding_key, err = sharding_metadata_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)
6767
if err ~= nil then
6868
return nil, err
6969
end

crud/select/compat/select.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ local sharding = require('crud.common.sharding.init')
77
local dev_checks = require('crud.common.dev_checks')
88
local common = require('crud.select.compat.common')
99
local schema = require('crud.common.schema')
10-
local sharding_key_module = require('crud.common.sharding_key')
10+
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
1111

1212
local compare_conditions = require('crud.compare.conditions')
1313
local select_plan = require('crud.select.plan')
@@ -51,7 +51,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
5151
return nil, SelectError:new("Space %q doesn't exist", space_name), true
5252
end
5353
local space_format = space:format()
54-
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name)
54+
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name)
5555
if err ~= nil then
5656
return nil, err
5757
end

crud/select/compat/select_old.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ local utils = require('crud.common.utils')
88
local sharding = require('crud.common.sharding.init')
99
local dev_checks = require('crud.common.dev_checks')
1010
local schema = require('crud.common.schema')
11-
local sharding_key_module = require('crud.common.sharding_key')
11+
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
1212

1313
local compare_conditions = require('crud.compare.conditions')
1414
local select_plan = require('crud.select.plan')
@@ -103,7 +103,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
103103
return nil, SelectError:new("Space %q doesn't exist", space_name), true
104104
end
105105
local space_format = space:format()
106-
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name)
106+
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name)
107107
if err ~= nil then
108108
return nil, err
109109
end

crud/update.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ local vshard = require('vshard')
55
local call = require('crud.common.call')
66
local utils = require('crud.common.utils')
77
local sharding = require('crud.common.sharding.init')
8-
local sharding_key_module = require('crud.common.sharding_key')
8+
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
99
local dev_checks = require('crud.common.dev_checks')
1010
local schema = require('crud.common.schema')
1111

@@ -88,7 +88,7 @@ local function call_update_on_router(space_name, key, user_operations, opts)
8888
if opts.bucket_id == nil then
8989
local err
9090
local primary_index_parts = space.index[0].parts
91-
sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)
91+
sharding_key, err = sharding_metadata_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)
9292
if err ~= nil then
9393
return nil, err
9494
end

test/helper.lua

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,10 @@ end
325325

326326
function helpers.update_cache(cluster, space_name)
327327
return cluster.main_server.net_box:eval([[
328-
local sharding_key = require('crud.common.sharding_key')
328+
local sharding_metadata = require('crud.common.sharding.sharding_metadata')
329329
330330
local space_name = ...
331-
return sharding_key.update_cache(space_name)
331+
return sharding_metadata.update_cache(space_name)
332332
]], {space_name})
333333
end
334334

test/unit/sharding_key_test.lua renamed to test/unit/sharding_metadata_test.lua

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
local t = require('luatest')
2-
local sharding_key_module = require('crud.common.sharding_key')
3-
local cache = require('crud.common.sharding_key_cache')
2+
local sharding_key_module = require('crud.common.sharding.sharding_metadata')
3+
local cache = require('crud.common.sharding.sharding_metadata_cache')
44
local utils = require('crud.common.utils')
55

66
local helpers = require('test.helper')
77

8-
local g = t.group('sharding_key')
8+
local g = t.group('sharding_metadata')
99

1010
g.before_each(function()
1111
local sharding_key_format = {

0 commit comments

Comments
 (0)