Skip to content

Commit

Permalink
Adjust for enforced deprecations in pandas
Browse files Browse the repository at this point in the history
  • Loading branch information
phofl committed Feb 6, 2024
1 parent 83368b5 commit ccab90b
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 114 deletions.
1 change: 1 addition & 0 deletions dask/dataframe/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
PANDAS_GE_210 = PANDAS_VERSION.release >= (2, 1, 0)
PANDAS_GE_211 = PANDAS_VERSION.release >= (2, 1, 1)
PANDAS_GE_220 = PANDAS_VERSION.release >= (2, 2, 0)
PANDAS_GE_300 = PANDAS_VERSION.release >= (3, 0, 0)

import pandas.testing as tm

Expand Down
135 changes: 69 additions & 66 deletions dask/dataframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
PANDAS_GE_150,
PANDAS_GE_200,
PANDAS_GE_210,
PANDAS_GE_300,
PANDAS_VERSION,
check_convert_dtype_deprecation,
check_nuisance_columns_warning,
Expand Down Expand Up @@ -3808,84 +3809,86 @@ def resample(self, rule, closed=None, label=None):

return Resampler(self, rule, closed=closed, label=label)

@_deprecated(
message=(
"Will be removed in a future version. "
"Please create a mask and filter using .loc instead"
)
)
@derived_from(pd.DataFrame)
def first(self, offset):
# Let pandas error on bad args
self._meta_nonempty.first(offset)
if not PANDAS_GE_300:

if not self.known_divisions:
raise ValueError("`first` is not implemented for unknown divisions")
@_deprecated(
message=(
"Will be removed in a future version. "
"Please create a mask and filter using .loc instead"
)
)
@derived_from(pd.DataFrame)
def first(self, offset):
# Let pandas error on bad args
self._meta_nonempty.first(offset)

offset = pd.tseries.frequencies.to_offset(offset)
date = self.divisions[0] + offset
end = self.loc._get_partitions(date)
if not self.known_divisions:
raise ValueError("`first` is not implemented for unknown divisions")

is_anchored = offset.is_anchored()
offset = pd.tseries.frequencies.to_offset(offset)
date = self.divisions[0] + offset
end = self.loc._get_partitions(date)

include_right = is_anchored or not hasattr(offset, "delta")
is_anchored = offset.is_anchored()

if end == self.npartitions - 1:
divs = self.divisions
else:
divs = self.divisions[: end + 1] + (date,)
include_right = is_anchored or not hasattr(offset, "delta")

name = "first-" + tokenize(self, offset)
dsk = {(name, i): (self._name, i) for i in range(end)}
dsk[(name, end)] = (
methods.boundary_slice,
(self._name, end),
None,
date,
include_right,
True,
)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
return new_dd_object(graph, name, self, divs)
if end == self.npartitions - 1:
divs = self.divisions
else:
divs = self.divisions[: end + 1] + (date,)

name = "first-" + tokenize(self, offset)
dsk = {(name, i): (self._name, i) for i in range(end)}
dsk[(name, end)] = (
methods.boundary_slice,
(self._name, end),
None,
date,
include_right,
True,
)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
return new_dd_object(graph, name, self, divs)

@_deprecated(
message=(
"Will be removed in a future version. "
"Please create a mask and filter using .loc instead"
@_deprecated(
message=(
"Will be removed in a future version. "
"Please create a mask and filter using .loc instead"
)
)
)
@derived_from(pd.DataFrame)
def last(self, offset):
# Let pandas error on bad args
self._meta_nonempty.last(offset)
@derived_from(pd.DataFrame)
def last(self, offset):
# Let pandas error on bad args
self._meta_nonempty.last(offset)

if not self.known_divisions:
raise ValueError("`last` is not implemented for unknown divisions")
if not self.known_divisions:
raise ValueError("`last` is not implemented for unknown divisions")

offset = pd.tseries.frequencies.to_offset(offset)
date = self.divisions[-1] - offset
start = self.loc._get_partitions(date)
offset = pd.tseries.frequencies.to_offset(offset)
date = self.divisions[-1] - offset
start = self.loc._get_partitions(date)

if start == 0:
divs = self.divisions
else:
divs = (date,) + self.divisions[start + 1 :]
if start == 0:
divs = self.divisions
else:
divs = (date,) + self.divisions[start + 1 :]

name = "last-" + tokenize(self, offset)
dsk = {
(name, i + 1): (self._name, j + 1)
for i, j in enumerate(range(start, self.npartitions))
}
dsk[(name, 0)] = (
methods.boundary_slice,
(self._name, start),
date,
None,
True,
False,
)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
return new_dd_object(graph, name, self, divs)
name = "last-" + tokenize(self, offset)
dsk = {
(name, i + 1): (self._name, j + 1)
for i, j in enumerate(range(start, self.npartitions))
}
dsk[(name, 0)] = (
methods.boundary_slice,
(self._name, start),
date,
None,
True,
False,
)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
return new_dd_object(graph, name, self, divs)

def nunique_approx(self, split_every=None):
"""Approximate number of unique rows.
Expand Down
97 changes: 51 additions & 46 deletions dask/dataframe/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
PANDAS_GE_200,
PANDAS_GE_210,
PANDAS_GE_220,
PANDAS_GE_300,
check_groupby_axis_deprecation,
check_numeric_only_deprecation,
check_observed_deprecation,
Expand Down Expand Up @@ -2859,57 +2860,61 @@ def _normalize_axis(self, axis, method: str):

return axis

@_deprecated(message="Please use `ffill`/`bfill` or `fillna` without a GroupBy.")
def fillna(self, value=None, method=None, limit=None, axis=no_default):
"""Fill NA/NaN values using the specified method.
if not PANDAS_GE_300:

Parameters
----------
value : scalar, default None
Value to use to fill holes (e.g. 0).
method : {'bfill', 'ffill', None}, default None
Method to use for filling holes in reindexed Series. ffill: propagate last
valid observation forward to next valid. bfill: use next valid observation
to fill gap.
axis : {0 or 'index', 1 or 'columns'}
Axis along which to fill missing values.
limit : int, default None
If method is specified, this is the maximum number of consecutive NaN values
to forward/backward fill. In other words, if there is a gap with more than
this number of consecutive NaNs, it will only be partially filled. If method
is not specified, this is the maximum number of entries along the entire
axis where NaNs will be filled. Must be greater than 0 if not None.
Returns
-------
Series or DataFrame
Object with missing values filled
See also
--------
pandas.core.groupby.DataFrameGroupBy.fillna
"""
axis = self._normalize_axis(axis, "fillna")
if not np.isscalar(value) and value is not None:
raise NotImplementedError(
"groupby-fillna with value=dict/Series/DataFrame is not supported"
)
@_deprecated(
message="Please use `ffill`/`bfill` or `fillna` without a GroupBy."
)
def fillna(self, value=None, method=None, limit=None, axis=no_default):
"""Fill NA/NaN values using the specified method.
Parameters
----------
value : scalar, default None
Value to use to fill holes (e.g. 0).
method : {'bfill', 'ffill', None}, default None
Method to use for filling holes in reindexed Series. ffill: propagate last
valid observation forward to next valid. bfill: use next valid observation
to fill gap.
axis : {0 or 'index', 1 or 'columns'}
Axis along which to fill missing values.
limit : int, default None
If method is specified, this is the maximum number of consecutive NaN values
to forward/backward fill. In other words, if there is a gap with more than
this number of consecutive NaNs, it will only be partially filled. If method
is not specified, this is the maximum number of entries along the entire
axis where NaNs will be filled. Must be greater than 0 if not None.
Returns
-------
Series or DataFrame
Object with missing values filled
See also
--------
pandas.core.groupby.DataFrameGroupBy.fillna
"""
axis = self._normalize_axis(axis, "fillna")
if not np.isscalar(value) and value is not None:
raise NotImplementedError(
"groupby-fillna with value=dict/Series/DataFrame is not supported"
)

kwargs = dict(value=value, method=method, limit=limit, axis=axis)
if PANDAS_GE_220:
func = M.fillna
kwargs.update(include_groups=False)
else:
func = _drop_apply
kwargs.update(by=self.by, what="fillna")
kwargs = dict(value=value, method=method, limit=limit, axis=axis)
if PANDAS_GE_220:
func = M.fillna
kwargs.update(include_groups=False)
else:
func = _drop_apply
kwargs.update(by=self.by, what="fillna")

meta = self._meta_nonempty.apply(func, **kwargs)
result = self.apply(func, meta=meta, **kwargs)
meta = self._meta_nonempty.apply(func, **kwargs)
result = self.apply(func, meta=meta, **kwargs)

if PANDAS_GE_150 and self.group_keys:
return result.map_partitions(M.droplevel, self.by)
if PANDAS_GE_150 and self.group_keys:
return result.map_partitions(M.droplevel, self.by)

return result
return result

@derived_from(pd.core.groupby.GroupBy)
def ffill(self, limit=None):
Expand Down
3 changes: 2 additions & 1 deletion dask/dataframe/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
PANDAS_GE_200,
PANDAS_GE_210,
PANDAS_GE_220,
PANDAS_GE_300,
tm,
)
from dask.dataframe._pyarrow import to_pyarrow_string
Expand Down Expand Up @@ -4771,7 +4772,7 @@ def test_shift_with_freq_errors():
pytest.raises(NotImplementedError, lambda: ddf.index.shift(2))


@pytest.mark.skipif(DASK_EXPR_ENABLED, reason="deprecated in pandas")
@pytest.mark.skipif(DASK_EXPR_ENABLED or PANDAS_GE_300, reason="deprecated in pandas")
@pytest.mark.parametrize("method", ["first", "last"])
def test_first_and_last(method):
f = lambda x, offset: getattr(x, method)(offset)
Expand Down
3 changes: 2 additions & 1 deletion dask/dataframe/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
PANDAS_GE_200,
PANDAS_GE_210,
PANDAS_GE_220,
PANDAS_GE_300,
check_nuisance_columns_warning,
check_numeric_only_deprecation,
check_observed_deprecation,
Expand Down Expand Up @@ -1241,7 +1242,7 @@ def test_aggregate_median(spec, keys, shuffle_method):
ddf.groupby(keys).median(shuffle_method=False).compute()


@pytest.mark.skipif(DASK_EXPR_ENABLED, reason="deprecated in pandas")
@pytest.mark.skipif(DASK_EXPR_ENABLED or PANDAS_GE_300, reason="deprecated in pandas")
@pytest.mark.parametrize("axis", [0, 1])
@pytest.mark.parametrize("group_keys", [True, False, None])
@pytest.mark.parametrize("limit", [None, 1, 4])
Expand Down

0 comments on commit ccab90b

Please sign in to comment.