"""ML-ENSEMBLE
:author: Sebastian Flennerhag
:copyright: 2017
:licence: MIT
"""
from __future__ import division, print_function, with_statement
from numpy import array
import subprocess
import sys
import os
try:
import psutil
except ImportError:
psutil = None
try:
import cPickle as pickle
except ImportError:
import pickle
from time import sleep
try:
from time import perf_counter as _time
except ImportError:
# Fall back on time for older versions
from time import time as _time
###############################################################################
[docs]def pickle_save(obj, name):
"""Utility function for pickling an object"""
with open(name + '.pkl', 'wb') as f:
pickle.dump(obj, f)
[docs]def pickle_load(name):
"""Utility function for loading pickled object"""
with open(name + '.pkl', 'rb') as f:
return pickle.load(f)
###############################################################################
[docs]def kwarg_parser(func, kwargs):
"""Utility function for parsing keyword arguments"""
func_kwargs = dict()
args = func.__code__.co_varnames
for arg in args:
if arg in kwargs:
func_kwargs[arg] = kwargs.pop(arg)
return func_kwargs, kwargs
[docs]def safe_print(*objects, **kwargs):
"""Safe print function for backwards compatibility."""
# Get stream
file = kwargs.pop('file', sys.stdout)
if isinstance(file, str):
file = getattr(sys, file)
# Get flush
flush = kwargs.pop('flush', False)
# Print
print(*objects, file=file, **kwargs)
# Need to flush outside print function for python2 compatibility
if flush:
file.flush()
[docs]def print_time(t0, message='', **kwargs):
"""Utility function for printing time"""
if len(message) > 0:
message += ' | '
m, s = divmod(_time() - t0, 60)
h, m = divmod(m, 60)
safe_print(message + '%02d:%02d:%02d\n' % (h, m, s), **kwargs)
[docs]class CMLog(object):
"""CPU and Memory logger.
Class for starting a monitor job of CPU and memory utilization in the
background in a Python script. The ``monitor`` class records the
``cpu_percent``, ``rss`` and ``vms`` as collected by the
psutil_ library for the parent process' pid.
CPU usage and memory utilization are stored as attributes in numpy arrays.
.. _psutil: https://pypi.python.org/pypi/psutil
Examples
--------
>>> from time import sleep
>>> from mlens.utils.utils import CMLog
>>> cm = CMLog(verbose=True)
>>> cm.monitor(2, 0.5)
>>> _ = [i for i in range(10000000)]
>>>
>>> # Collecting before completion triggers a message but no error
>>> cm._collect()
>>>
>>> sleep(2)
>>> cm._collect()
>>> print('CPU usage:')
>>> cm.cpu
[CMLog] Monitoring for 2 seconds with checks every 0.5 seconds.
[CMLog] Job not finished. Cannot _collect yet.
[CMLog] Collecting... done. Read 4 lines in 0.000 seconds.
CPU usage:
array([ 50. , 22.4, 6. , 11.9])
Raises
------
ImportError :
Depends on psutil. If not installed, raises ImportError on
instantiation.
Parameters
----------
verbose : bool
whether to notify of job start.
"""
def __init__(self, verbose=False):
self.verbose = verbose
self.pid = os.getpid()
if psutil is None:
raise ImportError("psutil not installed. Install psutil, for "
"example through pip (pip install psutil) "
"before initializing CMLog.")
[docs] def monitor(self, stop=None, ival=0.1, kill=True):
"""Start monitoring CPU and memory usage.
Parameters
----------
stop : float or None (default = None)
seconds to monitor for. If None, monitors until ``_collect`` is
called.
ival : float (default=0.1)
interval of monitoring.
kill : bool (default = True)
whether to kill the monitoring job if ``_collect`` is called before
timeout (``stop``). If set to False, calling ``_collect`` will
cause the instance to wait until the job completes.
"""
if stop is None and not kill:
raise ValueError("If no time limit is set 'kill' must be enabled.")
self._t0 = _time()
self._stop = stop
self._kill = kill
# Delete previous job data to avoid confusion
try:
del self.cpu
del self.rss
del self.vms
except AttributeError:
pass
if self.verbose:
if self._stop is not None:
safe_print("[CMLog] Monitoring for {} seconds with checks "
"every {} seconds.".format(stop, ival))
else:
safe_print("[CMLog] Monitoring until collection with checks "
"every {} seconds.".format(ival))
# Initialize subprocess
self._out = \
subprocess.Popen([sys.executable, '-c',
'from mlens.utils.utils import _recorder; '
'_recorder({}, {}, {})'.format(self.pid,
stop,
float(ival))],
stdout=subprocess.PIPE)
return
[docs] def collect(self):
"""Collect monitored data.
Once a monitor job finishes, call ``_collect`` to read the CPU and
memory usage into python objects in the current process. If called
before the job finishes, _collect issues a print statement to try
again later, but no warning or error is raised.
"""
if not hasattr(self, '_stop'):
safe_print('No monitoring job initiated: nothing to _collect.')
return
if self._stop is None:
# If no timer, kill process.
self._out.kill()
if self.verbose:
safe_print("[CMLog] Collecting...", end=" ", flush=True)
# Check if job is not completed and if so, check whether to kill
elif _time() - self._t0 < self._stop:
if self._kill:
if self.verbose:
safe_print("[CMLog] Job not finished - killing process "
"and collecting...", end=" ", flush=True)
self._out.kill()
elif self.verbose:
# Wait until completion (this is done in the 'communicate'
# command
safe_print("[CMLog] Job not finished - waiting "
"until completion and collecting...",
end=" ", flush=True)
# If job done, we just need to _collect
elif self.verbose:
safe_print("[CMLog] Collecting...", end=" ", flush=True)
t0, i = _time(), 0
cpu, rss, vms = [], [], []
out = self._out.communicate()
out = out[0].decode().strip().split('\n')
for line in out:
c, r, v = line.split(',')
cpu.append(float(c.strip()))
rss.append(int(r.strip()))
vms.append(int(v.strip()))
i += 1
if self.verbose:
safe_print('done. Read {} lines in '
'{:.3f} seconds.'.format(i, _time() - t0))
self.cpu = array(cpu)
self.vms = array(vms)
self.rss = array(rss)
# Clear job data
del self._t0
del self._stop
del self._kill
self._out.terminate()
del self._out
def _recorder(pid, stop, ival):
"""Subprocess call function to record cpu and memory."""
t = t0 = _time()
process = psutil.Process(pid)
if stop is None:
while True:
m = process.memory_info()
print(psutil.cpu_percent(), ',', m[0], ',', m[1])
sleep(ival)
t = _time()
else:
while t - t0 < stop:
m = process.memory_info()
print(psutil.cpu_percent(), ',', m[0], ',', m[1])
sleep(ival)
t = _time()