The Author Online Book Forums are Moving

The Author Online Book Forums will soon redirect to Manning's liveBook and liveVideo. All book forum content will migrate to liveBook's discussion forum and all video forum content will migrate to liveVideo. Log in to liveBook or liveVideo with your Manning credentials to join the discussion!

Thank you for your engagement in the AoF over the years! We look forward to offering you a more enhanced forum experience.

Shuyib (21) [Avatar] Offline
#1
Hi,

I tried running this code from nyc data all of it Chapter 5: Cleaning and transforming dataframes. Please help debug this since this was a similar paradigm in the hello dask notebook and it worked on a subset of data. But doesn't worked in all the full DAG of the data.

# finding the number of missing values per column
missing_values = nyc_data_raw.isnull().sum()

# finding the percentage of missing values in each column
percent_missing = ((missing_values / nyc_data_raw.index.size) * 100)
print(percent_missing.compute()[:10])

# drop columns that meet this threshold that is, greater than or equal to 50 percent
columns_to_drop = missing_values[percent_missing >= 50].index

# remove columns that meet this threshold
nyc_data_clean_stage1 = nyc_data_raw.drop(columns_to_drop, axis=1)

My output:
# drop columns that meet this threshold that is, greater than or equal to 50 percent

columns_to_drop = missing_values[percent_missing >= 50].index

?

# remove columns that meet this threshold

nyc_data_clean_stage1 = nyc_data_raw.drop(columns_to_drop, axis=1)

?

# df_dropped = df.drop(columns_to_drop, axis=1).persist()

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/miniconda3/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    136     try:
--> 137         yield
    138     except Exception as e:

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   3596     with raise_on_meta_error(funcname(func), udf=kwargs.pop('udf', False)):
-> 3597         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   3598 

~/miniconda3/lib/python3.6/site-packages/dask/utils.py in __call__(self, obj, *args, **kwargs)
    693     def __call__(self, obj, *args, **kwargs):
--> 694         return getattr(obj, self.method)(*args, **kwargs)
    695 

~/miniconda3/lib/python3.6/site-packages/pandas/core/frame.py in drop(self, labels, axis, index, columns, level, inplace, errors)
   3696                                            level=level, inplace=inplace,
-> 3697                                            errors=errors)
   3698 

~/miniconda3/lib/python3.6/site-packages/pandas/core/generic.py in drop(self, labels, axis, index, columns, level, inplace, errors)
   3110             if labels is not None:
-> 3111                 obj = obj._drop_axis(labels, axis, level=level, errors=errors)
   3112 

~/miniconda3/lib/python3.6/site-packages/pandas/core/generic.py in _drop_axis(self, labels, axis, level, errors)
   3142             else:
-> 3143                 new_axis = axis.drop(labels, errors=errors)
   3144             result = self.reindex(**{axis_name: new_axis})

~/miniconda3/lib/python3.6/site-packages/pandas/core/indexes/base.py in drop(self, labels, errors)
   4403                 raise KeyError(
-> 4404                     '{} not found in axis'.format(labels[mask]))
   4405             indexer = indexer[~mask]

KeyError: "['a' 'b'] not found in axis"

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
<ipython-input-9-c6cbaf1c3030> in <module>
      3 
      4 # remove columns that meet this threshold
----> 5 nyc_data_clean_stage1 = nyc_data_raw.drop(columns_to_drop, axis=1)
      6 
      7 # df_dropped = df.drop(columns_to_drop, axis=1).persist()

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in drop(self, labels, axis, errors)
   2824         axis = self._validate_axis(axis)
   2825         if axis == 1:
-> 2826             return self.map_partitions(M.drop, labels, axis=axis, errors=errors)
   2827         raise NotImplementedError("Drop currently only works for axis=1")
   2828 

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in map_partitions(self, func, *args, **kwargs)
    541         >>> ddf.map_partitions(func).clear_divisions()  # doctest: +SKIP
    542         """
--> 543         return map_partitions(func, self, *args, **kwargs)
    544 
    545     @insert_meta_param_description(pad=12)

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in map_partitions(func, *args, **kwargs)
   3634 
   3635     if meta is no_default:
-> 3636         meta = _emulate(func, *args, udf=True, **kwargs2)
   3637 
   3638     if all(isinstance(arg, Scalar) for arg in args):

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in _emulate(func, *args, **kwargs)
   3595     """
   3596     with raise_on_meta_error(funcname(func), udf=kwargs.pop('udf', False)):
-> 3597         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   3598 
   3599 

~/miniconda3/lib/python3.6/contextlib.py in __exit__(self, type, value, traceback)
     97                 value = type()
     98             try:
---> 99                 self.gen.throw(type, value, traceback)
    100             except StopIteration as exc:
    101                 # Suppress StopIteration *unless* it's the same exception that

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/utils.py in raise_on_meta_error(funcname, udf)
    152                 "{2}")
    153         msg = msg.format(" in `{0}`".format(funcname) if funcname else "", repr(e), tb)
--> 154         raise ValueError(msg)
    155 
    156 

ValueError: Metadata inference failed in `drop`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
KeyError("['a' 'b'] not found in axis",)

Traceback:
---------
  File "/Users/.../miniconda3/lib/python3.6/site-packages/dask/dataframe/utils.py", line 137, in raise_on_meta_error
    yield
  File "/Users/.../miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py", line 3597, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/Users/.../miniconda3/lib/python3.6/site-packages/dask/utils.py", line 694, in __call__
    return getattr(obj, self.method)(*args, **kwargs)
  File "/Users/.../miniconda3/lib/python3.6/site-packages/pandas/core/frame.py", line 3697, in drop
    errors=errors)
  File "/Users/.../miniconda3/lib/python3.6/site-packages/pandas/core/generic.py", line 3111, in drop
    obj = obj._drop_axis(labels, axis, level=level, errors=errors)
  File "/Users/.../miniconda3/lib/python3.6/site-packages/pandas/core/generic.py", line 3143, in _drop_axis
    new_axis = axis.drop(labels, errors=errors)
  File "/Users/.../miniconda3/lib/python3.6/site-packages/pandas/core/indexes/base.py", line 4404, in drop
    '{} not found in axis'.format(labels[mask])


Made the code work when I read them in separate chunks cells on Jupyter with
dask 0.20.1 py36_0
dask-core 0.20.1 py36_0
Shuyib (21) [Avatar] Offline
#2
I've also tried with the example presented still getting errors for dropping columns with the word violation

# removing a column names that meet a certain criteria
violationColumnNames = filter(lambda columnName: 'Violation' in columnName, nyc_data_raw.columns)
 
with ProgressBar():
    print(nyc_data_raw.drop(violationColumnNames, axis=1).head())


Result

[                                        ] | 0% Completed |  2.3s

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-31-14fd89717e56> in <module>
      3 
      4 with ProgressBar():
----> 5     print(nyc_data_raw.drop(violationColumnNames, axis=1).head())
      6 
      7 # Produces the following output:

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
    874 
    875         if compute:
--> 876             result = result.compute()
    877         return result
    878 

~/miniconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

~/miniconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    395     keys = [x.__dask_keys__() for x in collections]
    396     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 397     results = schedule(dsk, keys, **kwargs)
    398     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    399 

~/miniconda3/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     74     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     75                         cache=cache, get_id=_thread_get_id,
---> 76                         pack_exception=pack_exception, **kwargs)
     77 
     78     # Cleanup pools associated to dead threads

~/miniconda3/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    499                         _execute_task(task, data)  # Re-execute locally
    500                     else:
--> 501                         raise_exception(exc, tb)
    502                 res, worker_id = loads(res_info)
    503                 state['cache'][key] = res

~/miniconda3/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
    110         if exc.__traceback__ is not tb:
    111             raise exc.with_traceback(tb)
--> 112         raise exc
    113 
    114 else:

~/miniconda3/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    270     try:
    271         task, data = loads(task_info)
--> 272         result = _execute_task(task, data)
    273         id = get_id()
    274         result = dumps((result, id))

~/miniconda3/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk)
    250     elif istask(arg):
    251         func, args = arg[0], arg[1:]
--> 252         args2 = [_execute_task(a, cache) for a in args]
    253         return func(*args2)
    254     elif not ishashable(arg):

~/miniconda3/lib/python3.6/site-packages/dask/local.py in <listcomp>(.0)
    250     elif istask(arg):
    251         func, args = arg[0], arg[1:]
--> 252         args2 = [_execute_task(a, cache) for a in args]
    253         return func(*args2)
    254     elif not ishashable(arg):

~/miniconda3/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk)
    251         func, args = arg[0], arg[1:]
    252         args2 = [_execute_task(a, cache) for a in args]
--> 253         return func(*args2)
    254     elif not ishashable(arg):
    255         return arg

~/miniconda3/lib/python3.6/site-packages/dask/dataframe/core.py in apply_and_enforce(func, args, kwargs, meta)
   3691             if not np.array_equal(np.nan_to_num(meta.columns),
   3692                                   np.nan_to_num(df.columns)):
-> 3693                 raise ValueError("The columns in the computed data do not match"
   3694                                  " the columns in the provided metadata")
   3695             else:

ValueError: The columns in the computed data do not match the columns in the provided metadata


Please advise. I'm currently using:

dask 0.20.1 py36_0
dask-core 0.20.1 py36_0


Jesse C. Daniel (6) [Avatar] Offline
#3
Hi Shuyib,

Thanks for bringing this to my attention. I'll take a look at the code and see if I can reproduce/solve the issue. Thanks for providing your Dask version numbers as well.

Best,
Jesse
Jesse C. Daniel (6) [Avatar] Offline
#4
Hi Shuyib,

I'm very sorry for taking so long to respond. I've been hard at work finishing the final chapters of the book which should be available soon!

I'm trying to reproduce your error and I'm not having any success. Can you upload the whole notebook/.py file you're working with so I can step through it with a debugger?

Thanks!
Shuyib (21) [Avatar] Offline
#5
Hi Jesse,

Here's the notebook you request https://drive.google.com/open?id=1NmNysYPV_yGpMu6FfS7eRvdMJIFO3WPW. Please let me know if you anything else.

Jesse C. Daniel (6) [Avatar] Offline
#6
Hi Shuyib,

I can't reproduce the issue in your first post, even using the same version of Dask that you were using. I ran all the cells in the notebook you sent and the drop code didn't produce any errors. My guess by the stack trace is there were some entries in the columns_to_drop list that weren't in the dataframe. That could have happened if you ran some other code in a different cell that changed the contents of the columns_to_drop list. Do you still get the error if you restart the kernel and run all cells in order?

As for your second post, it looks like there is an issue with the Chapter 3 code. The code should read:

violationColumnNames = list(filter(lambda columnName: 'Violation' in columnName, nyc_data_raw.columns))


This makes the code compatible with Python 3 by explicitly casting the filter iterator to a list (filter in Python 2 creates a list by default instead of an iterator). Good catch! I've updated the code in the Chapter 3 notebook and the update will be released in the next update cycle.

Thanks,
Jesse
Shuyib (21) [Avatar] Offline
#7
Hi Jesse,

I concur with you in your first observation; the code is working fine now as well as testing it in the provided notebook. I've also tested out the code without the filter function. I've confirmed that all the columns with violations have been removed with `transpose()`.

If I find anything else I'll let you know here.