"""ML-ENSEMBLE
:author: Sebastian Flennerhag
:copyright: 2017
:license: MIT
Collection of dummy estimator classes, Mixins to build transparent layers for
unit testing.
Also contains pre-made Layer, LayerContainers and data generation functions
for unit testing.
"""
from __future__ import division, print_function
import gc
import os
import shutil
from abc import abstractmethod
import numpy as np
import warnings
from ..externals.joblib import Parallel, dump, load
from ..parallel.manager import Job
from .exceptions import NotFittedError
from ..externals.sklearn.base import BaseEstimator, TransformerMixin, clone
from ..externals.sklearn.validation import check_X_y, check_array
from ..base import INDEXERS
from ..ensemble.base import Layer, LayerContainer
from ..parallel.manager import ENGINES
[docs]class OLS(BaseEstimator):
"""No frills vanilla OLS estimator implemented through the normal equation.
MWE of a Scikit-learn estimator.
OLS is a simple estimator designed to allow for total control over
predictions in unit testing. It implements OLS through the Normal
Equation, no learning takes place. The ``offset`` option allows
the user to offset weights by a scalar value, if different instances
should be differentiated in their predictions.
Parameters
----------
offset : float (default = 0)
scalar value to add to the coefficient vector after fitting.
Examples
--------
Asserting the OLS passes the Scikit-learn estimator test
>>> from sklearn.utils.estimator_checks import check_estimator
>>> from mlens.utils.dummy import OLS
>>> check_estimator(OLS)
OLS comparison with Scikit-learn's LinearRegression
>>> from numpy.testing import assert_array_equal
>>> from mlens.utils.dummy import OLS
>>> from sklearn.linear_model import LinearRegression
>>> from sklearn.datasets import load_boston
>>> X, y = load_boston(True)
>>>
>>> lr = LinearRegression(False)
>>> lr.fit(X, y)
>>>
>>> ols = OLS()
>>> ols.fit(X, y)
>>>
>>> assert_array_equal(lr.coef_, ols.coef_)
"""
def __init__(self, offset=0):
self.offset = offset
[docs] def fit(self, X, y):
"""Fit coefficient vector."""
X, y = check_X_y(X, y, accept_sparse=False)
O = np.linalg.lstsq(X, y)
self.coef_ = O[0] + self.offset
self.resid_ = O[1]
return self
[docs] def predict(self, X):
"""Predict with fitted weights."""
if not hasattr(self, 'coef_'):
raise NotFittedError("Estimator not fitted. Call 'fit' first.")
X = check_array(X, accept_sparse=False)
return np.dot(X, self.coef_.T)
# FIXME: Needs a quality check!
[docs]class LogisticRegression(OLS):
"""No frill Logistic Regressor w. one-vs-rest estimation of P(label).
MWE of a Scikit-learn classifier.
LogisticRegression is a simple classifier estimator designed for
transparency in unit testing. It implements a Logistic
Regression with one-vs-rest strategy of classification.
The estimator is a wrapper around the :class:`OLS`. The OLS
prediction is squashed using the Sigmoid function, and classification
is done by picking the label with the highest probability.
The ``offset`` option allows the user to offset weights in the OLS by a
scalar value, if different instances should be differentiated in their
predictions.
Examples
--------
Asserting the LogisticRegression passes the Scikit-learn estimator test
>>> from sklearn.utils.estimator_checks import check_estimator
>>> from mlens.utils.dummy import LogisticRegression
>>> check_estimator(LogisticRegression)
Comparison with Scikit-learn's LogisticRegression
>>> from mlens.utils.dummy import LogisticRegression as mlensL
>>> from sklearn.linear_model import LogisticRegression as sklearnL
>>> from sklearn.datasets import make_classification
>>> X, y = make_classification()
>>>
>>> slr = sklearnL()
>>> slr.fit(X, y)
>>>
>>> mlr = mlensL()
>>> mlr.fit(X, y)
>>>
>>> (mlr.predict(X) == slr.predict(X)).sum() / y.shape
array([ 0.98])
"""
[docs] def fit(self, X, y):
"""Fit one model per label."""
X, y = check_X_y(X, y, accept_sparse=False)
self.labels_ = np.unique(y)
models = []
for label in self.labels_:
labels = y == label
models.append(super(LogisticRegression,
clone(self)).fit(X, labels))
self._models_ = models
self.coef_ = np.vstack([l.coef_ for l in self._models_])
return self
[docs] def predict_proba(self, X):
"""Get probability predictions."""
if not hasattr(self, '_models_'):
raise NotFittedError("Estimator not fitted. Call 'fit' first.")
X = check_array(X, accept_sparse=False)
preds = []
for m in self._models_:
p = 1 / (1 + np.exp(- m._predict(X)))
preds.append(p)
return np.vstack(preds).T
def _predict(self, X):
"""Original OLS prediction."""
return super(LogisticRegression, self).predict(X)
[docs] def predict(self, X):
"""Get label predictions."""
if not hasattr(self, '_models_'):
raise NotFittedError("Estimator not fitted. Call 'fit' first.")
X = check_array(X, accept_sparse=False)
preds = self.predict_proba(X)
labels = np.zeros(X.shape[0])
for i in range(X.shape[0]):
labels[i] = self.labels_[preds[i].argmax()]
return labels
[docs]class Scale(BaseEstimator, TransformerMixin):
"""Removes the a learnt mean in a column-wise manner in an array.
MWE of a Scikit-learn transformer, to be used for unit-tests of ensemble
classes.
Parameters
----------
copy : bool (default = True)
Whether to copy X before transforming.
Examples
--------
Asserting :class:`Scale` passes the Scikit-learn estimator test
>>> from sklearn.utils.estimator_checks import check_estimator
>>> from mlens.utils.dummy import Scale
>>> check_estimator(Scale)
Scaling elements
>>> from numpy import arange
>>> from mlens.utils.dummy import Scale
>>> X = arange(6).reshape(3, 2)
>>> X[:, 1] *= 2
>>> print('X:')
>>> print('%r' % X)
>>> print('Scaled:')
>>> S = Scale().fit_transform(X)
>>> print('%r' % S)
X:
array([[ 0, 2],
[ 2, 6],
[ 4, 10]])
Scaled:
array([[-2., -4.],
[ 0., 0.],
[ 2., 4.]])
"""
def __init__(self, copy=True):
self.copy = copy
self.__is_fitted__ = False
[docs] def fit(self, X, y=None):
"""Estimate mean.
Parameters
----------
X : array-like
training data to fit transformer on.
y : array-like or None
pass through for pipeline.
"""
X = check_array(X, accept_sparse='csr')
self.__is_fitted__ = True
self.mean_ = X.mean(axis=0)
return self
[docs]class InitMixin(object):
"""Mixin to make a mlens ensemble behave as Scikit-learn estimator.
Scikit-learn expects an estimator to be fully initialized when
instantiated, but an ML-Ensemble estimator requires layers to be
initialized before calling ``fit`` or ``predict`` makes sense.
``InitMixin`` is intended to be used to create temporary test classes
of proper mlens ensemble classes that are identical to the parent class
except that ``__init__`` will also initialize one layer with one
estimator, and if applicable one meta estimator.
The layer estimator and the meta estimator are both the dummy
``AverageRegressor`` class to minimize complexity and avoids raising
errors due to the estimators in the layers.
To create a testing class, modify the ``__init__`` of the test class
to call ``super().__init__`` as in the example below.
Examples
--------
Assert the :class:`SuperLearner` passes the Scikit-learn estimator test
>>> from sklearn.utils.estimator_checks import check_estimator
>>> from mlens.ensemble import SuperLearner
>>> from mlens.utils.dummy import InitMixin
>>>
>>> class TestSuperLearner(InitMixin, SuperLearner):
...
... def __init__(self):
... super(TestSuperLearner, self).__init__()
>>>
>>> check_estimator(TestSuperLearner)
"""
@abstractmethod
def __init__(self):
# Instantiate class
super(InitMixin, self).__init__()
# The test is parallelized and Scikit-learn estimators default to
# n_jobs = 1, so need to coerce ensembles to the same behavior
self.n_jobs = 1
# Build an ensemble consisting of two OLS estimators in the first
# layer, and a single on top.
if getattr(self, 'layers', None) is None:
getattr(self, 'add')([OLS(offset=1), OLS(offset=2)])
getattr(self, 'add_meta')(OLS())
###############################################################################
# Pre-made Layer and LayerContainer classes
PREPROCESSING = {'no': [], 'sc': [('scale', Scale())]}
ESTIMATORS = {'sc': [('offs', OLS(offset=2))],
'no': [('offs', OLS(offset=2)), ('null', OLS())]}
ESTIMATORS_PROBA = {'sc': [('offs', LogisticRegression(offset=2))],
'no': [('offs', LogisticRegression(offset=2)),
('null', LogisticRegression())]}
ECM = [('ols-%i' % i, OLS(offset=i)) for i in range(4)]
ECM_PROBA = [('lr-%i' % i, LogisticRegression(offset=i)) for i in range(4)]
###############################################################################
# Data generation functions and Layer estimation wrappers
[docs]class LayerGenerator(object):
"""Class for generating architectures of various types."""
def __init__(self):
pass
[docs] def get_layer(self, kls, proba, preprocessing, *args, **kwargs):
"""Generate a layer instance.
Parameters
----------
kls : str
class type
proba : bool
whether to set ``proba`` to ``True``
preprocessing : bool
layer with preprocessing cases
"""
indexer, kwargs = self.load_indexer(kls, args, kwargs)
if preprocessing:
ests = ESTIMATORS_PROBA if proba else ESTIMATORS
return Layer(estimators=ests,
cls=kls,
proba=proba,
indexer=indexer,
dtype=np.float,
partitions=1 if kls != 'subset' else
indexer.n_partitions,
preprocessing=PREPROCESSING)
else:
ests = ECM_PROBA if proba else ECM
return Layer(estimators=ests,
cls=kls,
proba=proba,
indexer=indexer,
dtype=np.float,
partitions=1 if kls != 'subset' else
indexer.n_partitions)
[docs] def get_layer_container(self, kls, proba, preprocessing, *args, **kwargs):
"""Generate a layer container instance.
Parameters
----------
kls : str
class type
proba : bool
whether to set ``proba`` to ``True``
preprocessing : bool
layer with preprocessing cases
"""
indexer, kwargs = self.load_indexer(kls, args, kwargs)
if preprocessing:
ests = ESTIMATORS_PROBA if proba else ESTIMATORS
return LayerContainer().add(estimators=ests,
cls=kls,
proba=proba,
indexer=indexer,
preprocessing=PREPROCESSING,
dtype=np.float64,
**kwargs)
else:
ests = ECM_PROBA if proba else ECM
return LayerContainer().add(estimators=ests,
cls=kls,
proba=proba,
indexer=indexer,
dtype=np.float64,
**kwargs)
@staticmethod
[docs] def load_indexer(kls, args, kwargs):
"""Load indexer and return remaining kwargs"""
indexer = INDEXERS[kls]
idx_kwargs = dict()
for var in indexer.__init__.__code__.co_varnames:
if var in kwargs:
idx_kwargs[var] = kwargs.pop(var)
indexer = indexer(*args, **idx_kwargs)
return indexer, kwargs
[docs]class Cache(object):
"""Object for controlling caching."""
def __init__(self, X, y, data):
path = os.path.join(os.getcwd(), 'tmp')
try:
shutil.rmtree(path)
except Exception:
pass
os.mkdir(path)
self.path = path
paths = {}
for name, arr in zip(('X', 'y'), (X, y)):
f = os.path.join(path, '%s.mmap' % name)
paths[name] = f
if os.path.exists(f):
os.unlink(f)
dump(arr, f)
X = load(paths['X'], mmap_mode='r')
y = load(paths['y'], mmap_mode='r')
# Prediction for fit
f = os.path.join(path, 'Pf.mmap')
if os.path.exists(f):
os.unlink(f)
n = data.indexer.n_test_samples
s = data.n_pred
if data.proba:
self.classes_ = data.classes_
s *= data.classes_
P_f = np.memmap(f, dtype=np.float64, shape=(n, s), mode='w+')
# Prediction for predict
f = os.path.join(path, 'Pp.mmap')
if os.path.exists(f):
os.unlink(f)
n = data.indexer.n_samples
s = data.n_pred
if data.proba:
s *= data.classes_
P_p = np.memmap(f, dtype=np.float64, shape=(n, s), mode='w+')
# Prediction for transform
f = os.path.join(path, 'Pt.mmap')
if os.path.exists(f):
os.unlink(f)
n = data.indexer.n_test_samples
s = data.n_pred
if data.proba:
s *= data.classes_
P_t = np.memmap(f, dtype=np.float64, shape=(n, s), mode='w+')
self.job = {'X': X,
'y': y,
'P_fit': P_f,
'P_predict': P_p,
'P_transform': P_t,
'dir': self.path}
[docs] def store_X_y(self, X, y, as_csv=False):
"""Save X and y to file in temporary directory."""
if not as_csv:
xf, yf = (os.path.join(self.path, 'X_mapped.npy'),
os.path.join(self.path, 'y_mapped.npy'))
np.save(xf, X)
np.save(yf, y)
else:
xf, yf = (os.path.join(self.path, 'X_mapped.csv'),
os.path.join(self.path, 'y_mapped.csv'))
np.savetxt(xf, X)
np.savetxt(yf, y)
return xf, yf
[docs] def layer_est(self, layer, attr):
"""Test the estimation routine for a layer."""
est = ENGINES[layer.cls]
# Wrap in try-except to always close the tmp if asked to
with Parallel(temp_folder=self.job['dir'],
mmap_mode='r+',
max_nbytes=None) as parallel:
# Run test
job = Job(attr)
job.y = self.job['y']
job.dir = self.job['dir']
job.predict_in = self.job['X']
if attr == 'fit':
job.predict_out = self.job['P_fit']
elif attr == 'transform':
job.predict_out = self.job['P_transform']
else:
job.predict_out = self.job['P_predict']
if hasattr(self, 'classes_'):
layer.classes_ = self.classes_
e = est(layer=layer, job=job)
e(parallel)
# Get prediction output
P = self.job['P_%s' % attr.split('_')[0]]
preds = np.asarray(P)
return preds
[docs] def terminate(self):
"""Remove temporary items in directory during tests."""
del self.job
gc.collect()
try:
shutil.rmtree(self.path)
except OSError:
warnings.warn("Failed to destroy temporary test cache at %s" % dir)
os.mkdir(self.path)
[docs]class Data(object):
"""Class for getting data."""
def __init__(self, cls, proba, preprocessing, *args, **kwargs):
self.proba = proba
self.preprocessing = preprocessing
self.cls = cls
self.indexer = INDEXERS[cls](*args, **kwargs)
[docs] def get_data(self, shape, m):
"""Generate X and y data with X.
Parameters
----------
shape : tuple
shape of data to be generated
m : int
length of step function for y
Returns
-------
train : ndarray
generated as a sequence of reshaped to (LEN, WIDTH)
labels : ndarray
generated as a step-function with a step every ``m``. As such,
each prediction fold during cross-validation have
a unique level value.
"""
s = shape[0]
w = shape[1]
train = np.array(range(int(s * w)), dtype='float').reshape((s, w))
train += 1
labels = np.zeros(train.shape[0])
if not self.proba:
increment = 10
for i in range(0, s, m):
labels[i:i + m] += increment
increment += 10
else:
labels = np.arange(train.shape[0]) % 2
return train, labels
def _set_up_est(self, y):
"""Get estimators, preprocessing, num_ests, predict attr to use."""
attr = 'predict_proba' if self.proba else 'predict'
labels = len(np.unique(y)) if self.proba else 1
if self.preprocessing:
ests = ESTIMATORS_PROBA if self.proba else ESTIMATORS
prep = PREPROCESSING
n_ests = 0
for case in ests:
for _ in ests[case]:
n_ests += 1
else:
ests = {'no-case': ECM_PROBA if self.proba else ECM}
prep = {'no-case': []}
n_ests = len(ests['no-case'])
self.classes_ = labels
self.n_pred = n_ests
if self.cls == 'subset':
self.n_pred *= self.indexer.n_partitions
return ests, prep, n_ests, attr, labels
def _folded_ests(self, X, y, subsets=1, verbose=True):
"""Build ground truth for each fold."""
if verbose:
print(' FOLD OUTPUT')
print('-' * 100)
print(' EST |'
' TRI |'
' TEI |'
' TEST LABELS |'
' TRAIN LABELS |'
' COEF |'
' PRED')
ests, prep, n_ests, attr, labels = self._set_up_est(y)
t = [t for _, t in self.indexer.generate(X, True)]
t = np.unique(np.hstack(t))
t.sort()
weights = []
F = np.zeros((len(t), n_ests * subsets * labels), dtype=np.float)
col_id = {}
col_ass = 0
# Sort at every occasion
for key in sorted(prep):
for i, (tri, tei) in enumerate(self.indexer.generate(X, True)):
if subsets > 1:
i = i // self.indexer.n_splits
else:
i = 0
for est_name, est in ests[key]:
if '%s-%s-%s' % (i, key, est_name) not in col_id:
col_id['%s-%s-%s' % (i, key, est_name)] = col_ass
col_ass += labels
xtrain = X[tri]
xtest = X[tei]
# Transform inputs
for _, tr in prep[key]:
t = clone(tr)
xtrain = t.fit_transform(xtrain)
xtest = t.transform(xtest)
# Fit estimator
e = clone(est).fit(xtrain, y[tri])
w = e.coef_
weights.append(w.tolist())
# Get out-of-sample predictions
p = getattr(e, attr)(xtest)
rebase = X.shape[0] - F.shape[0]
fix = tei - rebase
if labels == 1:
F[fix, col_id['%s-%s-%s' % (i, key, est_name)]] = p
else:
c = col_id['%s-%s-%s' % (i, key, est_name)]
F[np.ix_(fix, np.arange(c, c + labels))] = p
try:
if verbose:
print('%s | %r | %r | %r | %r | %13r | %r' % (
'%s-%s' % (key, est_name),
list(tri),
list(tei),
[float('%.1f' % i) for i in y[tei]],
[float('%.1f' % i) for i in y[tri]],
[float('%.1f' % i) for i in w],
[float('%.1f' % i) for i in p]))
except Exception:
pass
return F, weights
def _full_ests(self, X, y, subsets=1, verbose=True):
"""Get ground truth for train and predict on full data."""
if verbose:
print('\n FULL PREDICTION OUTPUT')
print('-' * 100)
print(' EST |'
' GROUND TRUTH |'
' COEF |'
' PRED')
ests, prep, n_ests, attr, labels = self._set_up_est(y)
if subsets == 1:
tri = [t for t, _ in self.indexer.generate(X, True)]
tri = np.unique(np.hstack(tri))
indexer = DummyPartition(tri)
else:
indexer = self.indexer
P = np.zeros((X.shape[0], n_ests * subsets * labels), dtype=np.float)
weights = list()
col_id = {}
col_ass = 0
for key in sorted(prep):
for i, tri in enumerate(indexer.partition(as_array=True)):
for est_name, est in ests[key]:
if '%s-%s-%s' % (i, key, est_name) not in col_id:
col_id['%s-%s-%s' % (i, key, est_name)] = col_ass
col_ass += labels
# Transform input
xtrain = X[tri]
ytrain = y[tri]
xtest = X
for _, tr in prep[key]:
t = clone(tr)
xtrain = t.fit_transform(xtrain)
xtest = t.transform(xtest)
# Fit est
e = clone(est).fit(xtrain, ytrain)
w = e.coef_
weights.append(w.tolist())
# Predict
p = getattr(e, attr)(xtest)
c = col_id['%s-%s-%s' % (i, key, est_name)]
if labels == 1:
P[:, c] = p
else:
P[:, c:c + labels] = p
try:
if verbose:
print('%s | %r | %11r | %r' % (
'%s-%s' % (key, est_name),
[float('%.1f' % i) for i in y],
[float('%.1f' % i) for i in w],
[float('%.1f' % i) for i in p]))
except Exception:
pass
return P, weights
[docs] def ground_truth(self, X, y, subsets=1, verbose=False):
"""Set up an experiment ground truth.
Returns
-------
F : ndarray
Full prediction array (train errors)
P : ndarray
Folded prediction array (test errors)
Raises
------
AssertionError :
Raises assertion error if any weight vectors overlap or any
predictions (as measured by columns in F and P) are judged to be
equal.
"""
if verbose:
print(' CONSTRUCTING GROUND TRUTH\n')
# Build predictions matrices.
N = 0
for case in ESTIMATORS:
N += len(ESTIMATORS[case])
F, weights_f = self._folded_ests(X, y, subsets, verbose)
P, weights_p = self._full_ests(X, y, subsets, verbose)
if verbose:
print('\n SUMMARY')
print('-' * 42)
col = 0
for case in sorted(ESTIMATORS):
for est_name, _ in ESTIMATORS[case]:
if verbose:
print('%s | %6s: %20r' % (
'%s-%s' % (case, est_name), 'FULL',
[float('%.1f' % i) for i in P[:, col]]))
print('%s | %6s: %20r' % (
'%s-%s' % (case, est_name), 'FOLDS',
[float('%.1f' % i) for i in F[:, col]]))
col += 1
if verbose:
print('GT : %r' % [float('%.1f' % i) for i in y])
print('\nCHECKING UNIQUENESS...', end=' ')
for i in range(N):
for j in range(N):
if j > i:
if P.shape[0] == F.shape[0]:
assert not np.equal(P[:, i], P[:, j]).all()
assert not np.equal(F[:, i], F[:, j]).all()
assert not np.equal(P[:, i], F[:, j]).all()
assert not np.equal(F[:, i], P[:, j]).all()
if verbose:
print('OK.')
return (F, weights_f), (P, weights_p)
[docs]class DummyPartition(object):
"""Dummy class to generate tri."""
def __init__(self, tri):
self.tri = tri
[docs] def partition(self, as_array=True):
"""Return the tri index."""
if as_array:
pass
yield self.tri
###############################################################################
[docs]def layer_fit(layer, cache, F, wf):
"""Test the layer's fit method."""
# Check predictions against ground truth
preds = cache.layer_est(layer, 'fit')
np.testing.assert_array_equal(preds, F)
# Check coefficients
d = layer.estimators_
if layer.cls != 'blend':
d = d[layer.n_pred:]
ests = [(c, tup) for c, tup in d]
w = [tup[1][1].coef_.tolist() for tup in ests]
assert w == wf
assert preds.__class__.__name__ == 'ndarray'
for i in layer.estimators_:
assert i[1][1].coef_.__class__.__name__ == 'ndarray'
[docs]def layer_predict(layer, cache, P, wp):
"""Test the layer's predict method."""
preds = cache.layer_est(layer, 'predict')
np.testing.assert_array_equal(preds, P)
# Check weights
d = layer.estimators_
ests = [(c, tup) for c, tup in d[:layer.n_pred]]
w = [tup[1][1].coef_.tolist() for tup in ests]
assert w == wp
[docs]def lc_fit(lc, X, y, F, wf):
"""Test the layer containers fit method."""
out = lc.fit(X, y, return_preds=True)
# Test preds
np.testing.assert_array_equal(F, out[-1])
# Test coefs
d = lc.layers['layer-1'].estimators_
if lc.layers['layer-1'].cls != 'blend':
d = d[lc.layers['layer-1'].n_pred:]
ests = [(c, tup) for c, tup in d]
w = [tup[1][1].coef_.tolist() for tup in ests]
assert w == wf
assert out[-1].__class__.__name__ == 'ndarray'
for i in lc.layers['layer-1'].estimators_:
assert i[1][1].coef_.__class__.__name__ == 'ndarray'
[docs]def lc_predict(lc, X, P, wp):
"""Test the layer containers predict method."""
pred = lc.predict(X)
# Test preds
np.testing.assert_array_equal(P, pred)
# Test coefs
d = lc.layers['layer-1'].estimators_
ests = [(c, tup) for c, tup in d[:lc.layers['layer-1'].n_pred]]
w = [tup[1][1].coef_.tolist() for tup in ests]
assert w == wp
[docs]def lc_feature_prop(lc, X, y, F):
"""Test input feature propagation."""
feature_prop = lc.layers["layer-1"].propagate_features
n = lc.layers["layer-1"].n_feature_prop
r = X.shape[0] - F.shape[0]
preds = lc.fit(X, y, return_preds=True)[1]
np.testing.assert_array_equal(X[r:, feature_prop], preds[:, :n])
np.testing.assert_array_equal(F, preds[:, n:])
[docs]def lc_from_file(lc, cache, X, y, F, wf, P, wp):
"""Fit a layer container from file path to numpy array."""
X_path, y_path = cache.store_X_y(X, y)
# TEST FIT
out = lc.fit(X_path, y_path, return_preds=True)
np.testing.assert_array_equal(F, out[-1])
d = lc.layers['layer-1'].estimators_
if lc.layers['layer-1'].cls != 'blend':
d = d[lc.layers['layer-1'].n_pred:]
ests = [(c, tup) for c, tup in d]
w = [tup[1][1].coef_.tolist() for tup in ests]
assert w == wf
# TEST MMAP
assert out[-1].__class__.__name__ == 'ndarray'
for e in lc.layers['layer-1'].estimators_:
assert e[1][1].coef_.__class__.__name__ == 'ndarray'
# TEST PREDICT
out = lc.predict(X_path)
np.testing.assert_array_equal(P, out)
d = lc.layers['layer-1'].estimators_
ests = [(c, tup) for c, tup in d[:lc.layers['layer-1'].n_pred]]
w = [tup[1][1].coef_.tolist() for tup in ests]
assert w == wp
[docs]def lc_from_csv(lc, cache, X, y, F, wf, P, wp):
"""Fit a layer container from file path to csv."""
X_path, y_path = cache.store_X_y(X, y, as_csv=True)
# TEST FIT
out = lc.fit(X_path, y_path, return_preds=True)
np.testing.assert_array_equal(F, out[-1])
d = lc.layers['layer-1'].estimators_
if lc.layers['layer-1'].cls != 'blend':
d = d[lc.layers['layer-1'].n_pred:]
ests = [(c, tup) for c, tup in d]
w = [tup[1][1].coef_.tolist() for tup in ests]
assert w == wf
# TEST MMAP
assert out[-1].__class__.__name__ == 'ndarray'
for e in lc.layers['layer-1'].estimators_:
assert e[1][1].coef_.__class__.__name__ == 'ndarray'
# TEST PREDICT
out = lc.predict(X_path)
np.testing.assert_array_equal(P, out)
d = lc.layers['layer-1'].estimators_
ests = [(c, tup) for c, tup in d[:lc.layers['layer-1'].n_pred]]
w = [tup[1][1].coef_.tolist() for tup in ests]
assert w == wp