Shuyib (18) [Avatar] Offline
#1
Hi,

I tried running this code from nyc data all of it Chapter 5: Cleaning and transforming dataframes. Please help debug this since this was a similar paradigm in the hello dask notebook and it worked on a subset of data. But doesn't worked in all the full DAG of the data.

# finding the number of missing values per column
missing_values = nyc_data_raw.isnull().sum()

# finding the percentage of missing values in each column
percent_missing = ((missing_values / nyc_data_raw.index.size) * 100)
print(percent_missing.compute()[:10])

# drop columns that meet this threshold that is, greater than or equal to 50 percent
columns_to_drop = missing_values[percent_missing >= 50].index

# remove columns that meet this threshold
nyc_data_clean_stage1 = nyc_data_raw.drop(columns_to_drop, axis=1)

My output:
# drop columns that meet this threshold that is, greater than or equal to 50 percent

columns_to_drop = missing_values[percent_missing >= 50].index

?

# remove columns that meet this threshold

nyc_data_clean_stage1 = nyc_data_raw.drop(columns_to_drop, axis=1)

?

# df_dropped = df.drop(columns_to_drop, axis=1).persist()

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/miniconda3/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    136     try:
--> 137         yield
    138     except Exception as e:

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   3596     with raise_on_meta_error(funcname(func), udf=kwargs.pop('udf', False)):
-> 3597         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   3598 

~/miniconda3/lib/python3.6/site-packages/dask/utils.py in __call__(self, obj, *args, **kwargs)
    693     def __call__(self, obj, *args, **kwargs):
--> 694         return getattr(obj, self.method)(*args, **kwargs)
    695 

~/miniconda3/lib/python3.6/site-packages/pandas/core/frame.py in drop(self, labels, axis, index, columns, level, inplace, errors)
   3696                                            level=level, inplace=inplace,
-> 3697                                            errors=errors)
   3698 

~/miniconda3/lib/python3.6/site-packages/pandas/core/generic.py in drop(self, labels, axis, index, columns, level, inplace, errors)
   3110             if labels is not None:
-> 3111                 obj = obj._drop_axis(labels, axis, level=level, errors=errors)
   3112 

~/miniconda3/lib/python3.6/site-packages/pandas/core/generic.py in _drop_axis(self, labels, axis, level, errors)
   3142             else:
-> 3143                 new_axis = axis.drop(labels, errors=errors)
   3144             result = self.reindex(**{axis_name: new_axis})

~/miniconda3/lib/python3.6/site-packages/pandas/core/indexes/base.py in drop(self, labels, errors)
   4403                 raise KeyError(
-> 4404                     '{} not found in axis'.format(labels[mask]))
   4405             indexer = indexer[~mask]

KeyError: "['a' 'b'] not found in axis"

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
<ipython-input-9-c6cbaf1c3030> in <module>
      3 
      4 # remove columns that meet this threshold
----> 5 nyc_data_clean_stage1 = nyc_data_raw.drop(columns_to_drop, axis=1)
      6 
      7 # df_dropped = df.drop(columns_to_drop, axis=1).persist()

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in drop(self, labels, axis, errors)
   2824         axis = self._validate_axis(axis)
   2825         if axis == 1:
-> 2826             return self.map_partitions(M.drop, labels, axis=axis, errors=errors)
   2827         raise NotImplementedError("Drop currently only works for axis=1")
   2828 

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in map_partitions(self, func, *args, **kwargs)
    541         >>> ddf.map_partitions(func).clear_divisions()  # doctest: +SKIP
    542         """
--> 543         return map_partitions(func, self, *args, **kwargs)
    544 
    545     @insert_meta_param_description(pad=12)

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in map_partitions(func, *args, **kwargs)
   3634 
   3635     if meta is no_default:
-> 3636         meta = _emulate(func, *args, udf=True, **kwargs2)
   3637 
   3638     if all(isinstance(arg, Scalar) for arg in args):

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   3595     """
   3596     with raise_on_meta_error(funcname(func), udf=kwargs.pop('udf', False)):
-> 3597         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   3598 
   3599 

~/miniconda3/lib/python3.6/contextlib.py in __exit__(self, type, value, traceback)
     97                 value = type()
     98             try:
---> 99                 self.gen.throw(type, value, traceback)
    100             except StopIteration as exc:
    101                 # Suppress StopIteration *unless* it's the same exception that

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    152                 "{2}")
    153         msg = msg.format(" in `{0}`".format(funcname) if funcname else "", repr(e), tb)
--> 154         raise ValueError(msg)
    155 
    156 

ValueError: Metadata inference failed in `drop`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
KeyError("['a' 'b'] not found in axis",)

Traceback:
---------
  File "/Users/.../miniconda3/lib/python3.6/site-packages/dask/dataframe/utils.py", line 137, in raise_on_meta_error
    yield
  File "/Users/.../miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py", line 3597, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/Users/.../miniconda3/lib/python3.6/site-packages/dask/utils.py", line 694, in __call__
    return getattr(obj, self.method)(*args, **kwargs)
  File "/Users/.../miniconda3/lib/python3.6/site-packages/pandas/core/frame.py", line 3697, in drop
    errors=errors)
  File "/Users/.../miniconda3/lib/python3.6/site-packages/pandas/core/generic.py", line 3111, in drop
    obj = obj._drop_axis(labels, axis, level=level, errors=errors)
  File "/Users/.../miniconda3/lib/python3.6/site-packages/pandas/core/generic.py", line 3143, in _drop_axis
    new_axis = axis.drop(labels, errors=errors)
  File "/Users/.../miniconda3/lib/python3.6/site-packages/pandas/core/indexes/base.py", line 4404, in drop
    '{} not found in axis'.format(labels[mask])


Made the code work when I read them in separate chunks cells on Jupyter with
dask 0.20.1 py36_0
dask-core 0.20.1 py36_0
Shuyib (18) [Avatar] Offline
#2
I've also tried with the example presented still getting errors for dropping columns with the word violation

# removing a column names that meet a certain criteria
violationColumnNames = filter(lambda columnName: 'Violation' in columnName, nyc_data_raw.columns)
 
with ProgressBar():
    print(nyc_data_raw.drop(violationColumnNames, axis=1).head())


Result

[                                        ] | 0% Completed |  2.3s

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-31-14fd89717e56> in <module>
      3 
      4 with ProgressBar():
----> 5     print(nyc_data_raw.drop(violationColumnNames, axis=1).head())
      6 
      7 # Produces the following output:

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
    874 
    875         if compute:
--> 876             result = result.compute()
    877         return result
    878 

~/miniconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

~/miniconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    395     keys = [x.__dask_keys__() for x in collections]
    396     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 397     results = schedule(dsk, keys, **kwargs)
    398     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    399 

~/miniconda3/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     74     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     75                         cache=cache, get_id=_thread_get_id,
---> 76                         pack_exception=pack_exception, **kwargs)
     77 
     78     # Cleanup pools associated to dead threads

~/miniconda3/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    499                         _execute_task(task, data)  # Re-execute locally
    500                     else:
--> 501                         raise_exception(exc, tb)
    502                 res, worker_id = loads(res_info)
    503                 state['cache'][key] = res

~/miniconda3/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
    110         if exc.__traceback__ is not tb:
    111             raise exc.with_traceback(tb)
--> 112         raise exc
    113 
    114 else:

~/miniconda3/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    270     try:
    271         task, data = loads(task_info)
--> 272         result = _execute_task(task, data)
    273         id = get_id()
    274         result = dumps((result, id))

~/miniconda3/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk)
    250     elif istask(arg):
    251         func, args = arg[0], arg[1:]
--> 252         args2 = [_execute_task(a, cache) for a in args]
    253         return func(*args2)
    254     elif not ishashable(arg):

~/miniconda3/lib/python3.6/site-packages/dask/local.py in <listcomp>(.0)
    250     elif istask(arg):
    251         func, args = arg[0], arg[1:]
--> 252         args2 = [_execute_task(a, cache) for a in args]
    253         return func(*args2)
    254     elif not ishashable(arg):

~/miniconda3/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk)
    251         func, args = arg[0], arg[1:]
    252         args2 = [_execute_task(a, cache) for a in args]
--> 253         return func(*args2)
    254     elif not ishashable(arg):
    255         return arg

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in apply_and_enforce(func, args, kwargs, meta)
   3691             if not np.array_equal(np.nan_to_num(meta.columns),
   3692                                   np.nan_to_num(df.columns)):
-> 3693                 raise ValueError("The columns in the computed data do not match"
   3694                                  " the columns in the provided metadata")
   3695             else:

ValueError: The columns in the computed data do not match the columns in the provided metadata


Please advise. I'm currently using:

dask 0.20.1 py36_0
dask-core 0.20.1 py36_0


Jesse C. Daniel (4) [Avatar] Offline
#3
Hi Shuyib,

Thanks for bringing this to my attention. I'll take a look at the code and see if I can reproduce/solve the issue. Thanks for providing your Dask version numbers as well.

Best,
Jesse