Skip to content

[ENH] Conditional_join #956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
86cd0a9
Merge pull request #1 from pyjanitor-devs/dev
samukweku Apr 23, 2021
7ee2e19
Merge branch 'pyjanitor-devs:dev' into dev
samukweku May 24, 2021
32be96c
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Jun 5, 2021
513ef04
updates
samukweku Jul 26, 2021
f3d9b11
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Aug 1, 2021
4f98e9d
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Aug 15, 2021
37af6a6
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Aug 19, 2021
facb52c
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Aug 19, 2021
5f3c8e3
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Aug 20, 2021
057c39b
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Aug 20, 2021
c25235a
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Aug 22, 2021
5a90734
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Sep 2, 2021
d0fb585
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Sep 5, 2021
cdef368
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Sep 5, 2021
ccbab57
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Sep 12, 2021
c4a47ba
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Oct 2, 2021
92e99aa
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Oct 3, 2021
5563104
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Oct 7, 2021
4987fc2
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Oct 11, 2021
1a6ef85
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Oct 17, 2021
ee2a51a
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Oct 25, 2021
fe0fac6
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Nov 1, 2021
b3c3bff
Merge branch 'pyjanitor-devs:dev' into dev
samukweku Nov 2, 2021
56d4e09
updates
samukweku Nov 2, 2021
3973d2c
updates
samukweku Nov 2, 2021
119b427
updates
samukweku Nov 2, 2021
8c6e780
updates
samukweku Nov 2, 2021
4d8c4c6
updates
samukweku Nov 2, 2021
742fb3f
updates
samukweku Nov 2, 2021
5f7583c
more tests
samukweku Nov 2, 2021
ec40c96
tests mods
samukweku Nov 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 124 additions & 105 deletions janitor/functions/conditional_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,39 @@ def conditional_join(
)


class _JoinOperator(Enum):
"""
List of operators used in conditional_join.
"""

GREATER_THAN = ">"
LESS_THAN = "<"
GREATER_THAN_OR_EQUAL = ">="
LESS_THAN_OR_EQUAL = "<="
STRICTLY_EQUAL = "=="
NOT_EQUAL = "!="


class _JoinTypes(Enum):
"""
List of join types for conditional_join.
"""

INNER = "inner"
LEFT = "left"
RIGHT = "right"


operator_map = {
_JoinOperator.STRICTLY_EQUAL.value: operator.eq,
_JoinOperator.LESS_THAN.value: operator.lt,
_JoinOperator.LESS_THAN_OR_EQUAL.value: operator.le,
_JoinOperator.GREATER_THAN.value: operator.gt,
_JoinOperator.GREATER_THAN_OR_EQUAL.value: operator.ge,
_JoinOperator.NOT_EQUAL.value: operator.ne,
}


def _conditional_join_compute(
df: pd.DataFrame,
right: pd.DataFrame,
Expand Down Expand Up @@ -413,6 +446,10 @@ def _multiple_conditional_join_ne(

if not mask.any():
return None

if mask.all():
return df_index, right_index

if is_extension_array_dtype(mask):
mask = mask.to_numpy(dtype=bool, na_value=False)

Expand All @@ -425,6 +462,7 @@ def _multiple_conditional_join_eq(
"""
Get indices for multiple conditions,
if any of the conditions has an `==` operator.

Returns a tuple of (df_index, right_index)
"""

Expand Down Expand Up @@ -465,15 +503,19 @@ def _multiple_conditional_join_eq(
left_c = left_c[df_index]
right_c = extract_array(right[right_on], extract_numpy=True)
right_c = right_c[right_index]

op = operator_map[op]

if mask is None:
mask = op(left_c, right_c)
else:
mask &= op(left_c, right_c)

if not mask.any():
return None

if mask.all():
return df_index, right_index

if is_extension_array_dtype(mask):
mask = mask.to_numpy(dtype=bool, na_value=False)

Expand Down Expand Up @@ -505,9 +547,10 @@ def _multiple_conditional_join_le_lt(
(left_op, right_op),
) = zip(*conditions)
if check2:
start_left, end_left = end_left, start_left
start_right, end_right = end_right, start_right
left_op, right_op = right_op, left_op
outcome = _range_indices(
return _range_indices(
df,
right,
start_left,
Expand All @@ -518,73 +561,65 @@ def _multiple_conditional_join_le_lt(
right_op,
)

if outcome is None:
return None
# find minimum df_index and right_index
# aim is to reduce search space
df_index = df.index
right_index = right.index
arrs = []
for left_on, right_on, op in conditions:
# no point checking for `!=`, since best case scenario
# they'll have the same no of rows as the other operators
if op == _JoinOperator.NOT_EQUAL.value:
continue

df_index, right_index = outcome
result = _generic_func_cond_join(
df.loc[df_index],
right.loc[right_index],
left_on,
right_on,
op,
join_type="multiple",
)

else:
# find minimum df_index and right_index
# aim is to reduce search space
df_index = df.index
right_index = right.index
arrs = []
for left_on, right_on, op in conditions:
# no point checking for `!=`, since best case scenario
# they'll have the same no of rows as the other operators
if op == _JoinOperator.NOT_EQUAL.value:
continue

result = _generic_func_cond_join(
df.loc[df_index],
right.loc[right_index],
left_on,
right_on,
op,
join_type="multiple",
)
if result is None:
return None

if result is None:
return None

df_index, right_index, arr, lengths = result
arrs.append((df_index, arr, lengths))

new_arrs = []

# trim to the minimum index for df
# and the smallest indices for right
# the minimum index for df should be available
# to all conditions; we achieve this via boolean indexing
for l_index, r_index, repeats in arrs:
bools = np.isin(l_index, df_index)
if not np.all(bools):
l_index = l_index[bools]
r_index = compress(r_index, bools)
repeats = repeats[bools]
new_arrs.append((l_index, r_index, repeats))
_, arr, repeats = zip(*new_arrs)

# with the aim of reducing the search space
# we get the smallest indices for each index in df
# e.g if df_index is [1, 2, 3]
# and there are two outcomes for the conditions for right:
# [[1,2,3], [4]], [[2], [4, 6]]
# the reconstituted indices will be the smallest per pairing
# which turns out to : [ [2], [4]]
# we achieve this by getting the minimum size in `repeats`
# and use that to index into `arr`
repeats = np.vstack(repeats)
positions = np.argmin(repeats, axis=0)
repeats = np.minimum.reduce(repeats)
arrays = []
arr = zip(
*arr
) # pair all the indices for right obtained per condition
for row, pos in zip(arr, positions):
arrays.append(row[pos])
right_index = np.concatenate(arrays)
df_index = df_index.repeat(repeats)
df_index, right_index, arr, lengths = result
arrs.append((df_index, arr, lengths))

new_arrs = []

# trim to the minimum index for df
# and the smallest indices for right
# the minimum index for df should be available
# to all conditions; we achieve this via boolean indexing
for l_index, r_index, repeats in arrs:
bools = np.isin(l_index, df_index)
if not np.all(bools):
l_index = l_index[bools]
r_index = compress(r_index, bools)
repeats = repeats[bools]
new_arrs.append((l_index, r_index, repeats))
_, arr, repeats = zip(*new_arrs)

# with the aim of reducing the search space
# we get the smallest indices for each index in df
# e.g if df_index is [1, 2, 3]
# and there are two outcomes for the conditions for right:
# [[1,2,3], [4]], [[2], [4, 6]]
# the reconstituted indices will be the smallest per pairing
# which turns out to : [ [2], [4]]
# we achieve this by getting the minimum size in `repeats`
# and use that to index into `arr`
repeats = np.vstack(repeats)
positions = np.argmin(repeats, axis=0)
repeats = np.minimum.reduce(repeats)
arrays = []
arr = zip(*arr) # pair all the indices for right obtained per condition
for row, pos in zip(arr, positions):
arrays.append(row[pos])
right_index = np.concatenate(arrays)
df_index = df_index.repeat(repeats)

mask = None
for left_on, right_on, op in conditions:
Expand All @@ -601,25 +636,16 @@ def _multiple_conditional_join_le_lt(

if not mask.any():
return None

if mask.all():
return df_index, right_index

if is_extension_array_dtype(mask):
mask = mask.to_numpy(dtype=bool, na_value=False)

return df_index[mask], right_index[mask]


class _JoinOperator(Enum):
"""
List of operators used in conditional_join.
"""

GREATER_THAN = ">"
LESS_THAN = "<"
GREATER_THAN_OR_EQUAL = ">="
LESS_THAN_OR_EQUAL = "<="
STRICTLY_EQUAL = "=="
NOT_EQUAL = "!="


def _create_conditional_join_empty_frame(
df: pd.DataFrame, right: pd.DataFrame, how: str
):
Expand Down Expand Up @@ -662,16 +688,6 @@ def _create_conditional_join_empty_frame(
return df.join(right, how=how, sort=False)


class _JoinTypes(Enum):
"""
List of join types for conditional_join.
"""

INNER = "inner"
LEFT = "left"
RIGHT = "right"


def _create_conditional_join_frame(
df: pd.DataFrame,
right: pd.DataFrame,
Expand All @@ -688,19 +704,20 @@ def _create_conditional_join_frame(
sorter = np.lexsort((right_index, left_index))
right_index = right_index[sorter]
left_index = left_index[sorter]
sorter = None

if set(df.columns).intersection(right.columns):
df.columns = pd.MultiIndex.from_product([["left"], df.columns])
right.columns = pd.MultiIndex.from_product([["right"], right.columns])

if how == _JoinTypes.INNER.value:
df = {
key: extract_array(value, extract_numpy=True)
for key, value in df.loc[left_index].items()
key: extract_array(value, extract_numpy=True)[left_index]
for key, value in df.items()
}
right = {
key: extract_array(value, extract_numpy=True)
for key, value in right.loc[right_index].items()
key: extract_array(value, extract_numpy=True)[right_index]
for key, value in right.items()
}
return pd.DataFrame({**df, **right})

Expand Down Expand Up @@ -753,7 +770,7 @@ def _range_indices(
right_op: str,
):
"""
Retrieve search space for a range/between join.
Retrieve index positions for a range/between join.

Idea inspired by article:
https://www.vertica.com/blog/what-is-a-range-join-and-why-is-it-so-fastba-p223413/
Expand Down Expand Up @@ -831,6 +848,18 @@ def _range_indices(
right_index = np.concatenate(right_index)
left_index = np.repeat(left_index, repeater)

# here we search for actual positions
# where left_c is </<= right_c
left_c = extract_array(df[end_left], extract_numpy=True)[left_index]
right_c = extract_array(right[end_right], extract_numpy=True)[right_index]

mask = right_op(left_c, right_c)

if not mask.any():
return None
if not mask.all():
return left_index[mask], right_index[mask]

return left_index, right_index


Expand Down Expand Up @@ -926,7 +955,7 @@ def _less_than_indices(

right_c = [right_index[ind:len_right] for ind in search_indices]

if join_type != "single":
if join_type == "multiple":
return (
left_index,
right_index[search_indices.min() :], # noqa: E203
Expand Down Expand Up @@ -1128,13 +1157,3 @@ def _not_equal_indices(
right_c = np.concatenate([lt_right, gt_right])

return left_c, right_c


operator_map = {
_JoinOperator.STRICTLY_EQUAL.value: operator.eq,
_JoinOperator.LESS_THAN.value: operator.lt,
_JoinOperator.LESS_THAN_OR_EQUAL.value: operator.le,
_JoinOperator.GREATER_THAN.value: operator.gt,
_JoinOperator.GREATER_THAN_OR_EQUAL.value: operator.ge,
_JoinOperator.NOT_EQUAL.value: operator.ne,
}
Loading