Skip to content

Commit dc52db3

Browse files
Add new hive _push method that supports cli operation and table properties. (#43)
1 parent c1e5199 commit dc52db3

File tree

1 file changed

+275
-82
lines changed

1 file changed

+275
-82
lines changed

omniduct/databases/hiveserver2.py

Lines changed: 275 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import os
55
import re
6+
import shutil
67
import tempfile
78
import time
89

@@ -17,15 +18,62 @@
1718

1819

1920
class HiveServer2Client(DatabaseClient):
21+
"""
22+
This Duct connects to an Apache HiveServer2 server instance using the
23+
`pyhive` or `impyla` libraries.
24+
25+
Attributes:
26+
schema (str, None): The default schema to use for queries (will
27+
default to server-default if not specified).
28+
driver (str): One of 'pyhive' (default) or 'impyla', which specifies
29+
how the client communicates with Hive.
30+
auth_mechanism (str): The authorisation protocol to use for connections.
31+
Defaults to 'NOSASL'. Authorisation methods differ between drivers.
32+
Please refer to `pyhive` and `impyla` documentation for more details.
33+
push_using_hive_cli (bool): Whether the `.push()` operation should
34+
directly add files using `LOAD DATA LOCAL INPATH` rather than the
35+
`INSERT` operation via SQLAlchemy. Note that this requires the
36+
presence of the `hive` executable on the local PATH, or if
37+
connecting via a `RemoteClient` instance, on the remote's PATH.
38+
This is mostly useful for older versions of Hive which do not
39+
support the `INSERT` statement.
40+
default_table_props (dict): A dictionary of table properties to use by
41+
default when creating tables.
42+
connection_options (dict): Additional options to pass through to the
43+
`.connect()` methods of the drivers.
44+
"""
2045

2146
PROTOCOLS = ['hiveserver2']
2247
DEFAULT_PORT = 3623
2348

24-
def _init(self, schema=None, driver='pyhive', auth_mechanism='NOSASL', **connection_options):
49+
def _init(self, schema=None, driver='pyhive', auth_mechanism='NOSASL',
50+
push_using_hive_cli=False, default_table_props=None, **connection_options):
51+
"""
52+
schema (str, None): The default database/schema to use for queries (will
53+
default to server-default if not specified).
54+
driver (str): One of 'pyhive' (default) or 'impyla', which specifies
55+
how the client communicates with Hive.
56+
auth_mechanism (str): The authorisation protocol to use for connections.
57+
Defaults to 'NOSASL'. Authorisation methods differ between drivers.
58+
Please refer to `pyhive` and `impyla` documentation for more details.
59+
push_using_hive_cli (bool): Whether the `.push()` operation should
60+
directly add files using `LOAD DATA LOCAL INPATH` rather than the
61+
`INSERT` operation via SQLAlchemy. Note that this requires the
62+
presence of the `hive` executable on the local PATH, or if
63+
connecting via a `RemoteClient` instance, on the remote's PATH.
64+
This is mostly useful for older versions of Hive which do not
65+
support the `INSERT` statement. False by default.
66+
default_table_props (dict): A dictionary of table properties to use by
67+
default when creating tables (default is an empty dict).
68+
**connection_options (dict): Additional options to pass through to the
69+
`.connect()` methods of the drivers.
70+
"""
2571
self.schema = schema
2672
self.driver = driver
2773
self.auth_mechanism = auth_mechanism
2874
self.connection_options = connection_options
75+
self.push_using_hive_cli = push_using_hive_cli
76+
self.default_table_props = default_table_props or {}
2977
self.__hive = None
3078
self.connection_fields += ('schema',)
3179

@@ -74,12 +122,11 @@ def _disconnect(self):
74122
self._sqlalchemy_engine = None
75123
self._sqlalchemy_metadata = None
76124

77-
def _execute(self, statement, cursor=None, poll_interval=1, async=False):
125+
def _execute(self, statement, cursor=None, async=False, poll_interval=1):
78126
"""
79-
Execute command
80-
81-
poll_interval : int, optional
82-
Default delay in polling for query status
127+
Additional Parameters:
128+
poll_interval (int): Default delay in seconds between consecutive
129+
query status (defaults to 1).
83130
"""
84131
cursor = cursor or self.__hive_cursor()
85132
log_offset = 0
@@ -135,11 +182,142 @@ def _log_status(self, cursor, log_offset=0):
135182

136183
return len(log)
137184

138-
def _push(self, df, table, if_exists='fail', schema=None, **kwargs):
185+
def _push(self, df, table, if_exists='fail', schema=None, use_hive_cli=None,
186+
partition=None, sep=chr(1), table_props=None, dtype_overrides=None, **kwargs):
187+
"""
188+
If `use_hive_cli` (or if not specified `.push_using_hive_cli`) is
189+
`True`, a `CREATE TABLE` statement will be automatically generated based
190+
on the datatypes of the DataFrame (unless overwritten by
191+
`dtype_overrides`). The `DataFrame` will then be exported to a CSV
192+
compatible with Hive and uploaded (if necessary) to the remote, before
193+
being loaded into Hive using a `LOAD DATA LOCAL INFILE ...` query using
194+
the `hive` cli executable. Note that if a table is not partitioned, you
195+
cannot convert it to a parititioned table without deleting it first.
196+
197+
If `use_hive_cli` (or if not specified `.push_using_hive_cli`) is
198+
`False`, an attempt will be made to push the `DataFrame` to Hive using
199+
`pandas.DataFrame.to_sql` and the SQLAlchemy binding provided by
200+
`pyhive` and `impyla`. This may be slower, does not support older
201+
versions of Hive, and does not support table properties or partitioning.
202+
203+
Additional Parameters:
204+
schema (str): The schema into which the table should be pushed. If
205+
not specified, the schema will be set to your username.
206+
use_hive_cli (bool, None): A local override for the global
207+
`.push_using_hive_cli` attribute. If not specified, the global
208+
default is used. If True, then pushes are performed using the
209+
`hive` CLI executable on the local/remote PATH.
210+
**kwargs (dict): Additional arguments to send to `pandas.DataFrame.to_sql`.
211+
212+
Further Parameters for CLI method (specifying these for the pandas
213+
method will cause a `RuntimeError` exception):
214+
partition (dict): A mapping of column names to values that specify
215+
the partition into which the provided data should be uploaded,
216+
as well as providing the fields by which new tables should be
217+
partitioned.
218+
sep (str): Field delimiter for data (defaults to CTRL-A, or `chr(1)`).
219+
table_props (dict): Properties to set on any newly created tables
220+
(extends `.default_table_props`).
221+
dtype_overrides (dict): Mapping of column names to Hive datatypes to
222+
use instead of default mapping.
223+
"""
224+
schema = schema or self.username
225+
use_hive_cli = use_hive_cli or self.push_using_hive_cli
226+
partition = partition or {}
227+
table_props = table_props or {}
228+
dtype_overrides = dtype_overrides or {}
229+
230+
# Try using SQLALchemy method
231+
if not use_hive_cli:
232+
if partition or table_props or dtype_overrides:
233+
raise RuntimeError(
234+
"At least one of `partition` or `table_props` or "
235+
"`dtype_overrides` has been specified. Setting table "
236+
"properties or partition information is not supported "
237+
"via the SQLAlchemy backend. If this is important, please "
238+
"pass `use_hive_cli=True`, otherwise remove these values "
239+
"and try again."
240+
)
241+
try:
242+
return df.to_sql(name=table, con=self._sqlalchemy_engine,
243+
index=False, if_exists=if_exists,
244+
schema=schema, **kwargs)
245+
except Exception as e:
246+
raise RuntimeError(
247+
"Push unsuccessful. Your version of Hive may be too old to "
248+
"support the `INSERT` keyword. You might want to try setting "
249+
"`.push_using_hive_cli = True` if your local or remote "
250+
"machine has access to the `hive` CLI executable. The "
251+
"original exception was: {}".format(e.args[0]))
252+
253+
# Try using Hive CLI
254+
255+
# If `partition` is specified, the associated columns must not be
256+
# present in the dataframe.
257+
assert len(set(partition).intersection(df.columns)) == 0, "The dataframe to be uploaded must not have any partitioned fields. Please remove the field(s): {}.".format(','.join(set(partition).intersection(df.columns)))
258+
259+
# Save dataframe to file and send it to the remote server if necessary
260+
temp_dir = tempfile.mkdtemp(prefix='omniduct_hiveserver2')
261+
tmp_fname = os.path.join(temp_dir, 'data_{}.csv'.format(time.time()))
262+
logger.info('Saving dataframe to file... {}'.format(tmp_fname))
263+
df.fillna(r'\N').to_csv(tmp_fname, index=False, header=False,
264+
sep=sep, encoding='utf-8')
265+
266+
if self.remote:
267+
logger.info("Uploading data to remote host...")
268+
self.remote.upload(tmp_fname)
269+
270+
# Generate create table statement.
271+
tblprops = self.default_table_props.copy()
272+
tblprops.update(table_props or {})
273+
cts = self._create_table_statement_from_df(
274+
df=df, table=table,
275+
schema=schema,
276+
drop=(if_exists == 'replace') and not partition,
277+
text=True,
278+
sep=sep,
279+
table_props=tblprops,
280+
partition_cols=list(partition),
281+
dtype_overrides=dtype_overrides
282+
)
283+
284+
# Generate load data statement.
285+
partition_clause = '' if not partition else 'PARTITION ({})'.format(','.join("{key} = '{value}'".format(key=key, value=value) for key, value in partition.items()))
286+
lds = '\nLOAD DATA LOCAL INPATH "{path}" {overwrite} INTO TABLE {schema}.{table} {partition_clause};'.format(
287+
path=os.path.basename(tmp_fname) if self.remote else tmp_fname,
288+
overwrite="OVERWRITE" if if_exists == "replace" else "",
289+
schema=schema,
290+
table=table,
291+
partition_clause=partition_clause
292+
)
293+
294+
# Run create table statement and load data statments
295+
logger.info(
296+
"Creating hive table `{schema}.{table}` if it does not "
297+
"already exist, and inserting the provided data{partition}."
298+
.format(
299+
schema=schema,
300+
table=table,
301+
partition="into {}".format(partition_clause) if partition_clause else ""
302+
)
303+
)
139304
try:
140-
return DatabaseClient._push(self, df, table, if_exists=if_exists, schema=schema or self.username, **kwargs)
141-
except Exception as e:
142-
raise RuntimeError("Push unsuccessful. Your version of Hive may be too old to support the `INSERT` keyword. Original exception was: {}".format(e.args[0]))
305+
stmts = '\n'.join([cts, lds])
306+
logger.debug(stmts)
307+
proc = self._run_in_hivecli(stmts)
308+
if proc.returncode != 0:
309+
raise RuntimeError(proc.stderr.decode())
310+
finally:
311+
# Clean up files
312+
if self.remote:
313+
self.remote.execute('rm -rf {}'.format(tmp_fname))
314+
shutil.rmtree(temp_dir, ignore_errors=True)
315+
316+
logger.info("Successfully uploaded dataframe {partition}`{schema}.{table}`.".format(
317+
schema=schema,
318+
table=table,
319+
partition="into {} of ".format(partition_clause) if partition_clause else ""
320+
))
143321

144322
def _table_list(self, schema=None, like='*', **kwargs):
145323
schema = schema or self.schema or 'default'
@@ -184,75 +362,90 @@ def _run_in_hivecli(self, cmd):
184362
proc = run_in_subprocess(sys_cmd, check_output=True)
185363
return proc
186364

187-
188-
def _create_table_statement_from_df(df, table, schema='default', drop=False,
189-
text=True, sep=None, loc=None):
190-
"""
191-
Return create table statement for new hive table based on pandas dataframe.
192-
193-
Parameters
194-
----------
195-
df : pandas.DataFrame or pandas.Series
196-
Used to determine column names and types for create table statement.
197-
table : str
198-
Table name for create table statement.
199-
schema : str
200-
Schema for create table statement
201-
drop : bool
202-
Whether to include a drop table statement along with create table statement.
203-
text : bool
204-
Whether data will be stored as a text file.
205-
sep : str
206-
Field delimiter for text file (only used if text==True).
207-
loc : str, optional
208-
Desired hdfs location.
209-
210-
Returns
211-
-------
212-
cmd : str
213-
A create table statement.
214-
"""
215-
# dtype kind to hive type mapping dict.
216-
DTYPE_KIND_HIVE_TYPE = {
217-
'b': 'BOOLEAN', # boolean
218-
'i': 'BIGINT', # signed integer
219-
'u': 'BIGINT', # unsigned integer
220-
'f': 'DOUBLE', # floating-point
221-
'c': 'STRING', # complex floating-point
222-
'O': 'STRING', # object
223-
'S': 'STRING', # (byte-)string
224-
'U': 'STRING', # Unicode
225-
'V': 'STRING' # void
226-
}
227-
sep = sep or "\t"
228-
229-
# Sanitive column names and map data types to hive types.
230-
columns = []
231-
for col, dtype in df.dtypes.iteritems():
232-
col_sanitized = re.sub('\W', '', col.lower().replace(' ', '_'))
233-
hive_type = DTYPE_KIND_HIVE_TYPE[dtype.kind]
234-
columns.append(' {column} {type}'.format(column=col_sanitized,
235-
type=hive_type))
236-
237-
cmd = Template("""
238-
{% if drop %}
239-
DROP TABLE IF EXISTS {{ schema }}.{{ table }};
240-
{% endif -%}
241-
CREATE TABLE IF NOT EXISTS {{ schema }}.{{ table }} (
242-
{%- for col in columns %}
243-
{{ col }} {% if not loop.last %}, {% endif %}
244-
{%- endfor %}
245-
)
246-
{%- if text %}
247-
ROW FORMAT DELIMITED
248-
FIELDS TERMINATED BY "{{ sep }}"
249-
STORED AS TEXTFILE
250-
{% endif %}
251-
{%- if loc %}
252-
LOCATION "{{ loc }}"
253-
{%- endif %}
254-
;
255-
""").render(drop=drop, table=table, schema=schema, columns=columns, text=text, sep=sep)
256-
257-
logger.debug('Create Table Statement: {}'.format(cmd))
258-
return cmd
365+
@classmethod
366+
def _create_table_statement_from_df(cls, df, table, schema='default', drop=False,
367+
text=True, sep=chr(1), loc=None,
368+
table_props=None, partition_cols=None,
369+
dtype_overrides=None):
370+
"""
371+
Return create table statement for new hive table based on pandas dataframe.
372+
373+
Parameters:
374+
df (pandas.DataFrame, pandas.Series): Used to determine column names
375+
and types for create table statement.
376+
table (str): The name of the target table.
377+
schema (str): The name of the target schema.
378+
drop (bool): Whether to include a drop table statement before the
379+
create table statement.
380+
text (bool): Whether data will be stored as a textfile.
381+
sep (str): The separator used by the text data store (defaults to
382+
CTRL-A, i.e. `chr(1)`, which is the default Hive separator).
383+
loc (str): Desired HDFS location (if not the default).
384+
table_props (dict): The table properties (if any) to set on the table.
385+
partition_cols (list): The columns by which the created table should
386+
be partitioned.
387+
388+
Returns:
389+
str: The Hive SQL required to create the table with the above
390+
configuration.
391+
"""
392+
table_props = table_props or {}
393+
partition_cols = partition_cols or []
394+
dtype_overrides = dtype_overrides or {}
395+
396+
# dtype kind to hive type mapping dict.
397+
DTYPE_KIND_HIVE_TYPE = {
398+
'b': 'BOOLEAN', # boolean
399+
'i': 'BIGINT', # signed integer
400+
'u': 'BIGINT', # unsigned integer
401+
'f': 'DOUBLE', # floating-point
402+
'c': 'STRING', # complex floating-point
403+
'O': 'STRING', # object
404+
'S': 'STRING', # (byte-)string
405+
'U': 'STRING', # Unicode
406+
'V': 'STRING' # void
407+
}
408+
409+
# Sanitise column names and map numpy/pandas data-types to hive types.
410+
columns = []
411+
for col, dtype in df.dtypes.iteritems():
412+
col_sanitized = re.sub('\W', '', col.lower().replace(' ', '_'))
413+
hive_type = dtype_overrides.get(col) or DTYPE_KIND_HIVE_TYPE[dtype.kind]
414+
columns.append(
415+
' {column} {type}'.format(column=col_sanitized, type=hive_type)
416+
)
417+
418+
partition_columns = ['{} STRING'.format(col) for col in partition_cols]
419+
420+
tblprops = ["'{key}' = '{value}'".format(key=key, value=value) for key, value in table_props.items()]
421+
tblprops = "TBLPROPERTIES({})".format(",".join(tblprops)) if len(tblprops) > 0 else ""
422+
423+
cmd = Template("""
424+
{% if drop %}
425+
DROP TABLE IF EXISTS {{ schema }}.{{ table }};
426+
{% endif -%}
427+
CREATE TABLE IF NOT EXISTS {{ schema }}.{{ table }} (
428+
{%- for col in columns %}
429+
{{ col }} {% if not loop.last %}, {% endif %}
430+
{%- endfor %}
431+
)
432+
{%- if partition_columns %}
433+
PARTITIONED BY (
434+
{%- for col in partition_columns %}
435+
{{ col }} {% if not loop.last %}, {% endif %}
436+
{%- endfor %}
437+
)
438+
{%- endif %}
439+
{%- if text %}
440+
ROW FORMAT DELIMITED
441+
FIELDS TERMINATED BY "{{ sep }}"
442+
STORED AS TEXTFILE
443+
{% endif %}
444+
{%- if loc %}
445+
LOCATION "{{ loc }}"
446+
{%- endif %}
447+
{{ tblprops }}
448+
;
449+
""").render(**locals())
450+
451+
return cmd

0 commit comments

Comments
 (0)