Skip to content

Commit 5cac02b

Browse files
Extract sharding key from conditions
PR #181 introduced support of DDL sharding keys. But if sharding key hasn't got a separate index in schema, select with equal conditions for all required sharding key fields still led to map-reduce instead of a single storage call. This patch introduces impoved support of sharding keys extraction and fixes the issue. Closes #213
1 parent cc2e0e8 commit 5cac02b

File tree

7 files changed

+407
-58
lines changed

7 files changed

+407
-58
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
2020

2121
* Use tuple-merger backed select implementation on tarantool 2.10+ (it gives
2222
less pressure on Lua GC).
23+
* DDL sharding key now can be extracted from select conditions even if
24+
there are no separate index.
2325

2426
## [0.9.0] - 20-10-21
2527

README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,6 @@ Current limitations for using custom sharding key:
101101
updated on storages, see
102102
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
103103
to do it manually with `require('crud.sharding_key').update_cache()`.
104-
- CRUD select may lead map reduce in some cases, see
105-
[#213](https://github.com/tarantool/crud/issues/213).
106104
- No support of JSON path for sharding key, see
107105
[#219](https://github.com/tarantool/crud/issues/219).
108106
- `primary_index_fieldno_map` is not cached, see

crud/select/plan.lua

Lines changed: 79 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ if has_keydef then
1212
keydef_lib = compat.require('tuple.keydef', 'key_def')
1313
end
1414

15-
local select_plan = {}
15+
local select_plan = {
16+
_internal = {},
17+
}
1618

1719
local IndexTypeError = errors.new_class('IndexTypeError', {capture_stack = false})
1820
local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false})
@@ -48,49 +50,92 @@ local function get_index_for_condition(space_indexes, space_format, condition)
4850
end
4951
end
5052

51-
local function extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
52-
if #scan_value < #sharding_index.parts then
53-
return nil
54-
end
53+
function select_plan._internal.extract_sharding_key_from_conditions(conditions, sharding_index,
54+
space_indexes, fieldno_map)
55+
dev_checks('table', 'table', 'table', 'table')
5556

56-
if scan_index.id == sharding_index.id then
57-
return scan_value
58-
end
57+
-- If name is both valid index name and field name,
58+
-- it is interpreted as index name.
59+
local filled_fields = {}
60+
for _, condition in ipairs(conditions) do
61+
if condition.operator ~= compare_conditions.operators.EQ then
62+
goto continue
63+
end
5964

60-
local scan_value_fields_values = {}
61-
for i, scan_index_part in ipairs(scan_index.parts) do
62-
scan_value_fields_values[scan_index_part.fieldno] = scan_value[i]
63-
end
65+
local index = space_indexes[condition.operand]
66+
if index ~= nil then
67+
for i, part in ipairs(index.parts) do
68+
-- Consider the following case:
69+
-- index_0: {'foo', 'bar'},
70+
-- index_1: {'baz', 'bar'},
71+
-- conditions: {{'==', 'index_0', {1, 2}}, {'==', 'index_1', {3, nil}}}.
72+
-- To check that nil parts will not overwrite already filled_fields,
73+
-- we verify that filled_fields[part.fieldno] is empty. If there are
74+
-- more than one non-null different value in conditions with equal operator,
75+
-- request is already in conflict and it doesn't matter what sharding key we
76+
-- will return.
77+
if filled_fields[part.fieldno] == nil then
78+
filled_fields[part.fieldno] = condition.values[i]
79+
end
80+
end
6481

65-
-- check that sharding key is included in the scan index fields
66-
local sharding_key = {}
67-
for _, sharding_key_part in ipairs(sharding_index.parts) do
68-
local fieldno = sharding_key_part.fieldno
82+
goto continue
83+
end
6984

70-
-- sharding key isn't included in scan key
71-
if scan_value_fields_values[fieldno] == nil then
72-
return nil
85+
local fieldno = fieldno_map[condition.operand]
86+
if fieldno == nil then
87+
goto continue
7388
end
89+
filled_fields[fieldno] = condition.values[1]
7490

75-
local field_value = scan_value_fields_values[fieldno]
91+
::continue::
92+
end
7693

77-
-- sharding key contains nil values
78-
if field_value == nil then
94+
local sharding_key = {}
95+
for i, v in ipairs(sharding_index.parts) do
96+
if filled_fields[v.fieldno] == nil then
7997
return nil
8098
end
8199

82-
table.insert(sharding_key, field_value)
100+
sharding_key[i] = filled_fields[v.fieldno]
83101
end
84102

85103
return sharding_key
86104
end
87105

106+
function select_plan._internal.get_sharding_key_from_scan_value(scan_value, scan_index, scan_iter, sharding_index)
107+
dev_checks('?', 'table', 'number', 'table')
108+
109+
if scan_value == nil then
110+
return nil
111+
end
112+
113+
if scan_iter ~= box.index.EQ and scan_iter ~= box.index.REQ then
114+
return nil
115+
end
116+
117+
if scan_index.id == sharding_index.id then
118+
if type(scan_value) ~= 'table' then
119+
return scan_value
120+
end
121+
122+
for i, _ in ipairs(sharding_index.parts) do
123+
if scan_value[i] == nil then return nil end
124+
end
125+
return scan_value
126+
end
127+
128+
return nil
129+
end
130+
88131
-- We need to construct after_tuple by field_names
89132
-- because if `fields` option is specified we have after_tuple with partial fields
90133
-- and these fields are ordered by field_names + primary key + scan key
91134
-- this order can be differ from order in space format
92135
-- so we need to cast after_tuple to space format for scrolling tuples on storage
93-
local function construct_after_tuple_by_fields(space_format, field_names, tuple)
136+
local function construct_after_tuple_by_fields(fieldno_map, field_names, tuple)
137+
dev_checks('?table', '?table', '?table|cdata')
138+
94139
if tuple == nil then
95140
return nil
96141
end
@@ -99,15 +144,10 @@ local function construct_after_tuple_by_fields(space_format, field_names, tuple)
99144
return tuple
100145
end
101146

102-
local positions = {}
103147
local transformed_tuple = {}
104148

105-
for i, field in ipairs(space_format) do
106-
positions[field.name] = i
107-
end
108-
109149
for i, field_name in ipairs(field_names) do
110-
local fieldno = positions[field_name]
150+
local fieldno = fieldno_map[field_name]
111151
if fieldno == nil then
112152
return nil, FilterFieldsError:new(
113153
'Space format doesn\'t contain field named %q', field_name
@@ -145,6 +185,8 @@ function select_plan.new(space, conditions, opts)
145185
local scan_value
146186
local scan_condition_num
147187

188+
local fieldno_map = utils.get_format_fieldno_map(space_format)
189+
148190
-- search index to iterate over
149191
for i, condition in ipairs(conditions) do
150192
scan_index = get_index_for_condition(space_indexes, space_format, condition)
@@ -176,9 +218,7 @@ function select_plan.new(space, conditions, opts)
176218

177219
-- handle opts.first
178220
local total_tuples_count
179-
local scan_after_tuple, err = construct_after_tuple_by_fields(
180-
space_format, field_names, opts.after_tuple
181-
)
221+
local scan_after_tuple, err = construct_after_tuple_by_fields(fieldno_map, field_names, opts.after_tuple)
182222
if err ~= nil then
183223
return nil, err
184224
end
@@ -230,9 +270,12 @@ function select_plan.new(space, conditions, opts)
230270
local sharding_index = opts.sharding_key_as_index_obj or primary_index
231271

232272
-- get sharding key value
233-
local sharding_key
234-
if scan_value ~= nil and (scan_iter == box.index.EQ or scan_iter == box.index.REQ) then
235-
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
273+
local sharding_key = select_plan._internal.get_sharding_key_from_scan_value(scan_value, scan_index,
274+
scan_iter, sharding_index)
275+
276+
if sharding_key == nil then
277+
sharding_key = select_plan._internal.extract_sharding_key_from_conditions(conditions, sharding_index,
278+
space_indexes, fieldno_map)
236279
end
237280

238281
local plan = {

test/entrypoint/srv_ddl.lua

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ package.preload['customers-storage'] = function()
6161
{path = 'name', is_nullable = false, type = 'string'},
6262
},
6363
}
64+
local age_index = {
65+
name = 'age',
66+
type = 'TREE',
67+
unique = false,
68+
parts = {
69+
{path = 'age', is_nullable = false, type = 'number'},
70+
},
71+
}
6472
local secondary_index = {
6573
name = 'secondary',
6674
type = 'TREE',
@@ -71,6 +79,17 @@ package.preload['customers-storage'] = function()
7179
},
7280
}
7381

82+
local three_fields_index = {
83+
name = 'three_fields',
84+
type = 'TREE',
85+
unique = false,
86+
parts = {
87+
{path = 'age', is_nullable = false, type = 'number'},
88+
{path = 'name', is_nullable = false, type = 'string'},
89+
{path = 'id', is_nullable = false, type = 'unsigned'},
90+
},
91+
}
92+
7493
local customers_name_key_schema = table.deepcopy(customers_schema)
7594
customers_name_key_schema.sharding_key = {'name'}
7695
table.insert(customers_name_key_schema.indexes, primary_index)
@@ -100,13 +119,27 @@ package.preload['customers-storage'] = function()
100119
table.insert(customers_age_key_schema.indexes, primary_index)
101120
table.insert(customers_age_key_schema.indexes, bucket_id_index)
102121

122+
local customers_name_age_key_different_indexes_schema = table.deepcopy(customers_schema)
123+
customers_name_age_key_different_indexes_schema.sharding_key = {'name', 'age'}
124+
table.insert(customers_name_age_key_different_indexes_schema.indexes, primary_index)
125+
table.insert(customers_name_age_key_different_indexes_schema.indexes, bucket_id_index)
126+
table.insert(customers_name_age_key_different_indexes_schema.indexes, age_index)
127+
128+
local customers_name_age_key_three_fields_index_schema = table.deepcopy(customers_schema)
129+
customers_name_age_key_three_fields_index_schema.sharding_key = {'name', 'age'}
130+
table.insert(customers_name_age_key_three_fields_index_schema.indexes, primary_index_id)
131+
table.insert(customers_name_age_key_three_fields_index_schema.indexes, bucket_id_index)
132+
table.insert(customers_name_age_key_three_fields_index_schema.indexes, three_fields_index)
133+
103134
local schema = {
104135
spaces = {
105136
customers_name_key = customers_name_key_schema,
106137
customers_name_key_uniq_index = customers_name_key_uniq_index_schema,
107138
customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema,
108139
customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema,
109140
customers_age_key = customers_age_key_schema,
141+
customers_name_age_key_different_indexes = customers_name_age_key_different_indexes_schema,
142+
customers_name_age_key_three_fields_index = customers_name_age_key_three_fields_index_schema,
110143
}
111144
}
112145

test/helpers/storage_stat.lua

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,16 @@ function storage_stat.diff(a, b)
9595
return diff
9696
end
9797

98+
-- Accepts collect (or diff) return value and returns
99+
-- total number of select requests across all storages.
100+
function storage_stat.total(stats)
101+
local total = 0
102+
103+
for _, stat in pairs(stats) do
104+
total = total + (stat.select_requests or 0)
105+
end
106+
107+
return total
108+
end
109+
98110
return storage_stat

0 commit comments

Comments
 (0)