Source code for mlens.parallel.manager

"""ML-Ensemble

:author: Sebastian Flennerhag
:copyright: 2017
:licence: MIT

Parallel processing job managers.
"""
import gc
import os
import shutil
import subprocess
import tempfile
import warnings

import numpy as np
from scipy.sparse import issparse, hstack

from . import Blender, Evaluation, SingleRun, Stacker, SubStacker
from .. import config
from ..externals.joblib import Parallel, dump, load
from ..utils import check_initialized
from ..utils.exceptions import (ParallelProcessingError,
                                ParallelProcessingWarning)


ENGINES = {'full': SingleRun,
           'stack': Stacker,
           'blend': Blender,
           'subset': SubStacker,
           'subsemble': SubStacker,
           'evaluation': Evaluation
           }

JOBS = ['predict', 'fit', 'transform', 'evaluate']


###############################################################################
[docs]def dump_array(array, name, dir): """Dump array for memmapping.""" # First check if the array is on file if isinstance(array, str): # Load file from disk. Need to dump if not memmaped already if not array.split('.')[-1] in ['mmap', 'npy', 'npz']: # Try loading the file assuming a csv-like format array = _load(array) if isinstance(array, str): # If arr remains a string, it's pointing to an mmap file f = array else: # Dump ndarray on disk f = os.path.join(dir, '%s.mmap' % name) if os.path.exists(f): os.unlink(f) dump(array, f) return f
def _load(arr): """Load array from file using default settings.""" if arr.split('.')[-1] in ['npy', 'npz']: return np.load(arr) else: try: return np.genfromtxt(arr) except Exception as e: raise IOError("Could not load X from %s, does not " "appear to be a valid ndarray. " "Details:\n%r" % (arr, e)) def _load_mmap(f): """Load a mmap presumably dumped by joblib, otherwise try numpy.""" try: return load(f, mmap_mode='r') except (IndexError, KeyError): # Joblib's 'load' func fails on npy and npz: use numpy.load return np.load(f, mmap_mode='r') def _check_job(job): """Check that a valid job is initialized.""" if job not in JOBS: raise NotImplementedError('The job %s is not valid input ' 'for the ParallelProcessing job ' 'manager. Accepted jobs: %r.' % (job, list(JOBS))) ###############################################################################
[docs]class Job(object): """Container class for holding job data. See Also -------- :class:`ParallelProcessing`, :class:`ParallelEvaluation` """ __slots__ = ['y', 'predict_in', 'predict_out', 'dir', 'job', 'tmp'] def __init__(self, job): _check_job(job) self.job = job self.y = None self.predict_in = None self.predict_out = None self.tmp = None self.dir = None
[docs] def update(self): """Shift output array to input array.""" # Enforce csr on spare matrices if issparse(self.predict_out) and not \ self.predict_out.__class__.__name__.startswith('csr'): self.predict_out = self.predict_out.tocsr() self.predict_in = self.predict_out
###############################################################################
[docs]class BaseProcessor(object): """Parallel processing base class. Base class for parallel processing engines. """ __slots__ = ['caller', '__initialized__', '__threading__', 'job'] def __init__(self, caller): self.job = None self.caller = caller self.__initialized__ = 0 self.__threading__ = self.caller.backend == 'threading'
[docs] def initialize(self, job, X, y=None, dir=None): """Create a job instance for estimation.""" self.job = Job(job) if dir is None: dir = config.TMPDIR try: self.job.tmp = \ tempfile.TemporaryDirectory(prefix=config.PREFIX, dir=dir) self.job.dir = self.job.tmp.name except AttributeError: # Fails on python 2 self.job.dir = tempfile.mkdtemp(prefix=config.PREFIX, dir=dir) # --- Prepare inputs for name, arr in zip(('X', 'y'), (X, y)): if arr is None: continue # Dump data in cache if self.__threading__: # No need to memmap f = None if isinstance(arr, str): arr = _load(arr) else: f = dump_array(arr, name, self.job.dir) # Store data for processing if name is 'y' and y is not None: self.job.y = arr if self.__threading__ else _load_mmap(f) else: self.job.predict_in = arr \ if self.__threading__ else _load_mmap(f) self.__initialized__ = 1 gc.collect()
[docs] def terminate(self): """Remove temporary folder and all cache data.""" # Delete all contents from cache try: self.job.tmp.cleanup() except (AttributeError, OSError): # Fall back on shutil for python 2, can also fail on windows try: shutil.rmtree(self.job.dir) except OSError: # Can fail on windows, need to use the shell try: subprocess.Popen('rmdir /S /Q %s' % self.job.dir, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except OSError: warnings.warn("Failed to delete cache at %s." "If created with default settings, will be " "removed on reboot. For immediate " "removal, manual removal is required." % self.job.dir, ParallelProcessingWarning) finally: # Always release process memory del self.job gc.collect() if not len(gc.garbage) == 0: warnings.warn("Clearing process memory failed, " "uncollected:\n%r." % gc.garbage, ParallelProcessingWarning) self.__initialized__ = 0
[docs]class ParallelProcessing(BaseProcessor): """Parallel processing engine. Engine for running ensemble estimation. Parameters ---------- layers : :class:`mlens.ensemble.base.LayerContainer` The ``LayerContainer`` that instantiated the processor. """ def __init__(self, caller): super(ParallelProcessing, self).__init__(caller)
[docs] def process(self): """Fit all layers in the attached :class:`LayerContainer`.""" check_initialized(self) # Process each layer sequentially with the same worker pool with Parallel(n_jobs=self.caller.n_jobs, temp_folder=self.job.dir, max_nbytes=None, mmap_mode='w+', verbose=self.caller.verbose, backend=self.caller.backend) as parallel: for name, lyr in self.caller.layers.items(): # Process layer self._partial_process(name, lyr, parallel) # Update input array with output array self.job.update()
def _partial_process(self, name, lyr, parallel): """Generate prediction matrix for a given :class:`layer`.""" lyr.indexer.fit(self.job.predict_in, self.job.y, self.job.job) self._gen_prediction_array(name, lyr, self.__threading__) # Run estimation to populate prediction matrix kwd = lyr.cls_kwargs if lyr.cls_kwargs else {} engine = ENGINES[lyr.cls](self.job, lyr, **kwd) engine(parallel) # Propagate features from input to output if lyr.propagate_features is not None: self._propagate_features(lyr) def _propagate_features(self, lyr): """Propagate features from input array to output array.""" p_out, p_in = self.job.predict_out, self.job.predict_in # Check for loss of obs between layers (i.e. blend) n_in, n_out = p_in.shape[0], p_out.shape[0] r = int(n_in - n_out) # Propagate features as the n first features of the outgoing array if not issparse(p_in): # Simple item setting p_out[:, :lyr.n_feature_prop] = p_in[r:, lyr.propagate_features] else: # Need to populate propagated features using scipy sparse hstack self.job.predict_out = hstack([p_in[r:, lyr.propagate_features], p_out[:, lyr.n_feature_prop:]] ).tolil() def _gen_prediction_array(self, name, lyr, threading): """Generate prediction array either in-memory or persist to disk.""" shape = self._get_lyr_sample_size(lyr) if threading: self.job.predict_out = np.empty(shape, dtype=lyr.dtype) else: f = os.path.join(self.job.dir, '%s.mmap' % name) try: self.job.predict_out = np.memmap(filename=f, dtype=lyr.dtype, mode='w+', shape=shape) except Exception as exc: raise OSError("Cannot create prediction matrix of shape (" "%i, %i), size %i MBs, for %s.\n Details:\n%r" % (shape[0], shape[1], 8 * shape[0] * shape[1] / (1024 ** 2), name, exc)) def _get_lyr_sample_size(self, lyr): """Decide what sample size to create P with based on the job type.""" s0 = lyr.indexer.n_test_samples if self.job.job != 'predict' else \ lyr.indexer.n_samples # Number of prediction columns depends on: # 1. number of estimators in layer # 2. if ``predict_proba``, number of classes in training set # 3. number of subsets (default is one for all data) # 4. number of features to propagate # Note that 1., 3. and 4. are params but 2. is data dependent s1 = lyr.n_pred if lyr.proba: if self.job.job == 'fit': lyr.classes_ = np.unique(self.job.y).shape[0] s1 *= lyr.classes_ if lyr.propagate_features is not None: s1 += lyr.n_feature_prop return s0, s1
[docs] def get_preds(self, dtype=None, order='C'): """Return prediction matrix. Parameters ---------- dtype : numpy dtype object, optional data type to return order : str (default = 'C') data order. See :class:`numpy.asarray` for details. """ if not hasattr(self, 'job'): raise ParallelProcessingError("Processor has been terminated: " "cannot retrieve final prediction " "array from cache.") if dtype is None: dtype = self.caller.layers[self.caller.layer_names[-1]].dtype if issparse(self.job.predict_out): return self.job.predict_out else: return np.asarray(self.job.predict_out, dtype=dtype, order=order)
###############################################################################
[docs]class ParallelEvaluation(BaseProcessor): """Parallel cross-validation engine. Parameters ---------- caller : :class:`Evaluator` The ``Evaluator`` that instantiated the processor. """ def __init__(self, caller): super(ParallelEvaluation, self).__init__(caller)
[docs] def process(self, attr): """Fit all layers in the attached :class:`LayerContainer`.""" check_initialized(self) # Use context manager to ensure same parallel job during entire process with Parallel(n_jobs=self.caller.n_jobs, temp_folder=self.job.dir, max_nbytes=None, mmap_mode='w+', verbose=self.caller.verbose, backend=self.caller.backend) as parallel: f = ENGINES['evaluation'](self.caller) getattr(f, attr)(parallel, self.job.predict_in, self.job.y, self.job.dir)