Skip to content

Commit 9793f84

Browse files
Merge pull request #174 from kitagry/add-range-paritioned
feat: Add range partitioning support
2 parents c265236 + 068acba commit 9793f84

File tree

5 files changed

+153
-1
lines changed

5 files changed

+153
-1
lines changed

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ Following options are same as [bq command-line tools](https://cloud.google.com/b
110110
| time_partitioning.type | string | required | nil | The only type supported is DAY, which will generate one partition per day based on data loading time. |
111111
| time_partitioning.expiration_ms | int | optional | nil | Number of milliseconds for which to keep the storage for a partition. |
112112
| time_partitioning.field | string | optional | nil | `DATE` or `TIMESTAMP` column used for partitioning |
113+
| range_partitioning | hash | optional | nil | See [Range Partitioning](#range-partitioning) |
114+
| range_partitioning.field | string | required | nil | `INT64` column used for partitioning |
115+
| range-partitioning.range | hash | required | nil | Defines the ranges for range paritioning |
116+
| range-partitioning.range.start | int | required | nil | The start of range partitioning, inclusive. |
117+
| range-partitioning.range.end | int | required | nil | The end of range partitioning, exclusive. |
118+
| range-partitioning.range.interval| int | required | nil | The width of each interval. |
113119
| clustering | hash | optional | nil | Currently, clustering is supported for partitioned tables, so must be used with `time_partitioning` option. See [clustered tables](https://cloud.google.com/bigquery/docs/clustered-tables) |
114120
| clustering.fields | array | required | nil | One or more fields on which data should be clustered. The order of the specified columns determines the sort order of the data. |
115121
| schema_update_options | array | optional | nil | (Experimental) List of `ALLOW_FIELD_ADDITION` or `ALLOW_FIELD_RELAXATION` or both. See [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.schemaUpdateOptions). NOTE for the current status: `schema_update_options` does not work for `copy` job, that is, is not effective for most of modes such as `append`, `replace` and `replace_backup`. `delete_in_advance` deletes origin table so does not need to update schema. Only `append_direct` can utilize schema update. |
@@ -448,6 +454,24 @@ MEMO: [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/big
448454
to update the schema of the desitination table as a side effect of the load job, but it is not available for copy job.
449455
Thus, it was not suitable for embulk-output-bigquery idempotence modes, `append`, `replace`, and `replace_backup`, sigh.
450456

457+
### Range Partitioning
458+
459+
See also [Creating and Updating Range-Partitioned Tables](https://cloud.google.com/bigquery/docs/creating-partitioned-tables).
460+
461+
To load into a partition, specify `range_partitioning` and `table` parameter with a partition decorator as:
462+
463+
```yaml
464+
out:
465+
type: bigquery
466+
table: table_name$1
467+
range_partitioning:
468+
field: customer_id
469+
range:
470+
start: 1
471+
end: 99999
472+
interval: 1
473+
```
474+
451475
## Development
452476

453477
### Run example:
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
in:
2+
type: file
3+
path_prefix: example/example.csv
4+
parser:
5+
type: csv
6+
charset: UTF-8
7+
newline: CRLF
8+
null_string: 'NULL'
9+
skip_header_lines: 1
10+
comment_line_marker: '#'
11+
columns:
12+
- {name: date, type: string}
13+
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
14+
- {name: "null", type: string}
15+
- {name: long, type: long}
16+
- {name: string, type: string}
17+
- {name: double, type: double}
18+
- {name: boolean, type: boolean}
19+
out:
20+
type: bigquery
21+
mode: replace
22+
auth_method: service_account
23+
json_keyfile: example/your-project-000.json
24+
dataset: your_dataset_name
25+
table: your_field_partitioned_table_name
26+
source_format: NEWLINE_DELIMITED_JSON
27+
compression: NONE
28+
auto_create_dataset: true
29+
auto_create_table: true
30+
schema_file: example/schema.json
31+
range_partitioning:
32+
field: 'long'
33+
range:
34+
start: 90
35+
end: 100
36+
interval: 1

lib/embulk/output/bigquery.rb

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ def self.configure(config, schema, task_count)
8989
'ignore_unknown_values' => config.param('ignore_unknown_values', :bool, :default => false),
9090
'allow_quoted_newlines' => config.param('allow_quoted_newlines', :bool, :default => false),
9191
'time_partitioning' => config.param('time_partitioning', :hash, :default => nil),
92+
'range_partitioning' => config.param('range_partitioning', :hash, :default => nil),
9293
'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0
9394
'schema_update_options' => config.param('schema_update_options', :array, :default => nil),
9495

@@ -227,14 +228,55 @@ def self.configure(config, schema, task_count)
227228
task['abort_on_error'] = (task['max_bad_records'] == 0)
228229
end
229230

231+
if task['time_partitioning'] && task['range_partitioning']
232+
raise ConfigError.new "`time_partitioning` and `range_partitioning` cannot be used at the same time"
233+
end
234+
230235
if task['time_partitioning']
231236
unless task['time_partitioning']['type']
232237
raise ConfigError.new "`time_partitioning` must have `type` key"
233238
end
234-
elsif Helper.has_partition_decorator?(task['table'])
239+
end
240+
241+
if Helper.has_partition_decorator?(task['table'])
242+
if task['range_partitioning']
243+
raise ConfigError.new "Partition decorators(`#{task['table']}`) don't support `range_partition`"
244+
end
235245
task['time_partitioning'] = {'type' => 'DAY'}
236246
end
237247

248+
if task['range_partitioning']
249+
unless task['range_partitioning']['field']
250+
raise ConfigError.new "`range_partitioning` must have `field` key"
251+
end
252+
unless task['range_partitioning']['range']
253+
raise ConfigError.new "`range_partitioning` must have `range` key"
254+
end
255+
256+
range = task['range_partitioning']['range']
257+
unless range['start']
258+
raise ConfigError.new "`range_partitioning` must have `range.start` key"
259+
end
260+
unless range['start'].is_a?(Integer)
261+
raise ConfigError.new "`range_partitioning.range.start` must be an integer"
262+
end
263+
unless range['end']
264+
raise ConfigError.new "`range_partitioning` must have `range.end` key"
265+
end
266+
unless range['end'].is_a?(Integer)
267+
raise ConfigError.new "`range_partitioning.range.end` must be an integer"
268+
end
269+
unless range['interval']
270+
raise ConfigError.new "`range_partitioning` must have `range.interval` key"
271+
end
272+
unless range['interval'].is_a?(Integer)
273+
raise ConfigError.new "`range_partitioning.range.interval` must be an integer"
274+
end
275+
if range['start'] + range['interval'] >= range['end']
276+
raise ConfigError.new "`range_partitioning.range.start` + `range_partitioning.range.interval` must be less than `range_partitioning.range.end`"
277+
end
278+
end
279+
238280
if task['clustering']
239281
unless task['clustering']['fields']
240282
raise ConfigError.new "`clustering` must have `fields` key"

lib/embulk/output/bigquery/bigquery_client.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,18 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
435435
}
436436
end
437437

438+
options['range_partitioning'] ||= @task['range_partitioning']
439+
if options['range_partitioning']
440+
body[:range_partitioning] = {
441+
field: options['range_partitioning']['field'],
442+
range: {
443+
start: options['range_partitioning']['range']['start'].to_s,
444+
end: options['range_partitioning']['range']['end'].to_s,
445+
interval: options['range_partitioning']['range']['interval'].to_s,
446+
},
447+
}
448+
end
449+
438450
options['clustering'] ||= @task['clustering']
439451
if options['clustering']
440452
body[:clustering] = {

test/test_configure.rb

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,44 @@ def test_time_partitioning
270270
assert_equal 'DAY', task['time_partitioning']['type']
271271
end
272272

273+
def test_range_partitioning
274+
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 3, 'interval' => 1 }})
275+
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }
276+
277+
# field is required
278+
config = least_config.merge('range_partitioning' => {'range' => { 'start' => 1, 'end' => 2, 'interval' => 1 }})
279+
assert_raise { Bigquery.configure(config, schema, processor_count) }
280+
281+
282+
# range is required
283+
config = least_config.merge('range_partitioning' => {'field' => 'foo'})
284+
assert_raise { Bigquery.configure(config, schema, processor_count) }
285+
286+
# range.start is required
287+
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'end' => 2, 'interval' => 1 }})
288+
assert_raise { Bigquery.configure(config, schema, processor_count) }
289+
290+
# range.end is required
291+
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'interval' => 1 }})
292+
assert_raise { Bigquery.configure(config, schema, processor_count) }
293+
294+
# range.interval is required
295+
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2 }})
296+
assert_raise { Bigquery.configure(config, schema, processor_count) }
297+
298+
# range.start + range.interval should be less than range.end
299+
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2, 'interval' => 2 }})
300+
assert_raise { Bigquery.configure(config, schema, processor_count) }
301+
end
302+
303+
def test_time_and_range_partitioning_error
304+
config = least_config.merge('time_partitioning' => {'type' => 'DAY'}, 'range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2, 'interval' => 1 }})
305+
assert_raise { Bigquery.configure(config, schema, processor_count) }
306+
307+
config = least_config.merge('table' => 'table_name$20160912', 'range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2, 'interval' => 1 }})
308+
assert_raise { Bigquery.configure(config, schema, processor_count) }
309+
end
310+
273311
def test_clustering
274312
config = least_config.merge('clustering' => {'fields' => ['field_a']})
275313
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }

0 commit comments

Comments
 (0)