Skip to content

Use sharding keys to calculate bucket id #181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_on_push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
include:
- tarantool-version: "2.7"
remove-merger: true
- tarantool-version: "2.7"
- tarantool-version: "2.8"
coveralls: true
fail-fast: false
runs-on: [ubuntu-latest]
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

* CRUD operations calculates bucket id automatically using sharding
key specified with DDL schema or in `_ddl_sharding_key` space.
NOTE: CRUD methods delete(), get() and update() requires that sharding key
must be a part of primary key.

### Changed

### Fixed
Expand Down
59 changes: 54 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,60 @@ crud.unflatten_rows(res.rows, res.metadata)
**Notes:**

* A space should have a format.
* By default, `bucket_id` is computed as `vshard.router.bucket_id_strcrc32(key)`,
where `key` is the primary key value.
Custom bucket ID can be specified as `opts.bucket_id` for each operation.
For operations that accepts tuple/object bucket ID can be specified as
tuple/object field as well as `opts.bucket_id` value.

**Sharding key and bucket id calculation**

*Sharding key* is a set of tuple field values used for calculation *bucket ID*.
*Sharding key definition* is a set of tuple field names that describe what
tuple field should be a part of sharding key. *Bucket ID* determines which
replicaset stores certain data. Function that used for bucket ID calculation is
named *sharding function*.

By default CRUD calculates bucket ID using primary key and a function
`vshard.router.bucket_id_strcrc32(key)`, it happen automatically and doesn't
require any actions from user side. However, for operations that accepts
tuple/object bucket ID can be specified as tuple/object field as well as
`opts.bucket_id` value.

Starting from 0.10.0 users who don't want to use primary key as a sharding key
may set custom sharding key definition as a part of [DDL
schema](https://github.com/tarantool/ddl#input-data-format) or insert manually
to the space `_ddl_sharding_key` (for both cases consider a DDL module
documentation). As soon as sharding key for a certain space is available in
`_ddl_sharding_key` space CRUD will use it for bucket ID calculation
automatically. Note that CRUD methods `delete()`, `get()` and `update()`
requires that sharding key must be a part of primary key.

Table below describe what operations supports custom sharding key:

| CRUD method | Sharding key support |
| -------------------------------- | -------------------------- |
| `get()` | Yes |
| `insert()` / `insert_object()` | Yes |
| `delete()` | Yes |
| `replace()` / `replace_object()` | Yes |
| `upsert()` / `upsert_object()` | Yes |
| `select()` / `pairs()` | Yes |
| `update()` | Yes |
| `upsert()` / `upsert_object()` | Yes |
| `replace() / replace_object()` | Yes |
| `min()` / `max()` | No (not required) |
| `cut_rows()` / `cut_objects()` | No (not required) |
| `truncate()` | No (not required) |
| `len()` | No (not required) |

Current limitations for using custom sharding key:

- It's not possible to update sharding keys automatically when schema is
updated on storages, see
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
to do it manually with `require('crud.sharding_key').update_cache()`.
- CRUD select may lead map reduce in some cases, see
[#213](https://github.com/tarantool/crud/issues/213).
- No support of JSON path for sharding key, see
[#219](https://github.com/tarantool/crud/issues/219).
- `primary_index_fieldno_map` is not cached, see
[#243](https://github.com/tarantool/crud/issues/243).

### Insert

Expand Down
2 changes: 2 additions & 0 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ local select = require('crud.select')
local truncate = require('crud.truncate')
local len = require('crud.len')
local borders = require('crud.borders')
local sharding_key = require('crud.common.sharding_key')
local utils = require('crud.common.utils')

local crud = {}
Expand Down Expand Up @@ -113,6 +114,7 @@ function crud.init_storage()
truncate.init()
len.init()
borders.init()
sharding_key.init()
end

function crud.init_router()
Expand Down
27 changes: 27 additions & 0 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,31 @@ function call.single(bucket_id, func_name, func_args, opts)
return res
end

function call.any(func_name, func_args, opts)
dev_checks('string', '?table', {
timeout = '?number',
})

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT

local replicasets, err = vshard.router.routeall()
if replicasets == nil then
return nil, CallError:new("Failed to get all replicasets: %s", err.err)
end
local replicaset = select(2, next(replicasets))

local res, err = replicaset:call(func_name, func_args, {
timeout = timeout,
})
if err ~= nil then
return nil, wrap_vshard_err(err, func_name, replicaset.uuid)
end

if res == box.NULL then
return nil
end

return res
end

return call
1 change: 1 addition & 0 deletions crud/common/const.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ local const = {}

const.RELOAD_RETRIES_NUM = 1
const.RELOAD_SCHEMA_TIMEOUT = 3 -- 3 seconds
const.FETCH_SHARDING_KEY_TIMEOUT = 3 -- 3 seconds

return const
22 changes: 18 additions & 4 deletions crud/common/sharding.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local errors = require('errors')
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})

local utils = require('crud.common.utils')
local sharding_key_module = require('crud.common.sharding_key')

local sharding = {}

Expand All @@ -20,7 +21,16 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
return specified_bucket_id
end

local key = utils.extract_key(tuple, space.index[0].parts)
local sharding_index_parts = space.index[0].parts
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space.name)
if err ~= nil then
return nil, err
end
if sharding_key_as_index_obj ~= nil then
sharding_index_parts = sharding_key_as_index_obj.parts
end
local key = utils.extract_key(tuple, sharding_index_parts)

return sharding.key_get_bucket_id(key)
end

Expand All @@ -43,11 +53,15 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
end
end

if tuple[bucket_id_fieldno] == nil then
tuple[bucket_id_fieldno] = sharding.tuple_get_bucket_id(tuple, space)
local bucket_id = tuple[bucket_id_fieldno]
if bucket_id == nil then
bucket_id, err = sharding.tuple_get_bucket_id(tuple, space)
if err ~= nil then
return nil, err
end
tuple[bucket_id_fieldno] = bucket_id
end

local bucket_id = tuple[bucket_id_fieldno]
return bucket_id
end

Expand Down
Loading