from __future__ import unicode_literals, print_function
import os
import re
import sys
import stat
import zlib
import time
import types
import base64
import shutil
import pathlib
import inspect
import logging
import functools
import posixpath
from subprocess import Popen
from subprocess import PIPE
from subprocess import STDOUT
from copy import deepcopy
from ConfigObject import ConfigObject
from contextlib import contextmanager
try:
from fabric import api as fabric
except ImportError:
HAS_FABRIC = False
else: # pragma: no cover
if 'nosetests' in sys.argv[0]:
HAS_FABRIC = False
else:
HAS_FABRIC = True
__all__ = [
'logopts', 'info', 'debug', 'error', 'exc', # logging
'console_script', 'requires', 'sh', 'pipe', 'env', 'ini', 'stdin', 'test',
'ls', 'cat', 'grep', 'find', 'cut', 'tr', 'head', 'tail', 'sed', 'awk',
'nc', 'ping', 'nmap', 'hostname', 'host', 'rsync', 'wget', 'curl',
'cd', 'which', 'mktemp', 'echo', 'wc',
'tar', 'gzip', 'gunzip', 'zip', 'unzip',
'vlc', 'ffmpeg', 'convert',
'virtualenv', 'pip',
'git', 'hg', 'svn',
'ssh', 'sudo',
'path', 'pwd', # path is posixpath, pwd return os.getcwd()
'escape', 'e', # e is escape()
]
__not_piped__ = ['chmod', 'cp', 'scp', 'mkdir', 'mv', 'rm', 'rmdir', 'touch']
__all__ += __not_piped__
log = logging.getLogger('chut')
aliases = dict(
ifconfig='/sbin/ifconfig',
sudo='/usr/bin/sudo',
ssh='ssh',
)
class Log(object):
initialized = False
loggers = {}
def __call__(self, args={'--quiet': False, '--verbose': False},
fmt=None, name=None, stream=sys.stderr):
if not name:
name = posixpath.basename(sys.argv[0])
if name in self.loggers:
return self.loggers[name]
log = logging.getLogger(name)
if not log.handlers:
if fmt == 'brief':
fmt = '[%(levelname)-4s] %(message)s'
elif fmt == 'msg':
fmt = '%(message)s'
else:
fmt = '%(asctime)s %(levelname)-6s %(name)s %(message)s'
logging.basicConfig(stream=stream, format=fmt)
if not log.level: # pragma: no cover
if args.get('--quiet'):
level = logging.ERROR
elif args.get('--debug'):
level = logging.DEBUG
else:
level = logging.INFO
log.setLevel(level)
self.loggers[name] = log
return log
def log(self, name):
def wrapper(*args, **kwargs):
log = self()
return getattr(log, str(name))(*args, **kwargs)
wrapper.__name__ = str(name)
return wrapper
def check_sudo():
sudo = aliases.get('sudo')
if not os.path.isfile(sudo):
raise OSError('sudo is not installed')
args = [sudo, '-s', 'whoami']
kwargs = dict(stdout=PIPE, stderr=STDOUT)
log.debug('Popen(%r, **%r)', args, kwargs)
whoami = Popen(args, env=env, **kwargs)
whoami.wait()
whoami = whoami.stdout.read().strip()
if whoami != b'root':
raise OSError('Not able to run sudo.')
def escape(value):
chars = "|!`'[]() "
esc = '\\'
if isinstance(value, bytes):
chars = chars.encode('ascii')
esc = esc.encode('ascii')
for c in chars:
value = value.replace(c, esc + c)
return value
[docs]def ini(filename, **defaults):
"""Load a .ini file in a ConfigObject. Dont raise if the file does not
exist"""
filename = sh.path(filename)
defaults.update(home=sh.path('~'))
return ConfigObject(filename=filename, defaults=defaults)
[docs]class Environ(dict):
"""Manage os.environ"""
def __getattr__(self, attr):
value = self.get(attr.upper(), None)
if attr.lower() in ('path',):
return value.split(os.pathsep)
return value
def __setattr__(self, attr, value):
if isinstance(value, (list, tuple)):
value = os.pathsep.join(value)
self[attr.upper()] = value
def __delattr__(self, attr):
attr = attr.upper()
if attr in self:
del self[attr]
[docs] def copy(self, **kwargs):
environ = self.__class__(self)
environ(**kwargs)
return environ
def __call__(self, **kwargs):
return ChangeEnviron(self, **kwargs)
class ChangeEnviron:
"""Change the environment and keep a track of the previous one in
order to restore it. This is meant to be used in a with statement."""
def __init__(self, env, **kwargs):
self._prevenv = Environ(env)
self._env = env
for k, v in kwargs.items():
if v is None:
delattr(self._env, k)
else:
setattr(self._env, k, v)
def __enter__(self):
return self._env
def __exit__(self, type, value, traceback):
# restore the previous values...
self._env.update(self._prevenv)
# and suppress the added keys
for k in set(self._env.keys()) - set(self._prevenv):
del self._env[k]
class Path(object):
def __getattr__(self, attr):
return getattr(posixpath, attr)
def lib(self, *args):
return self(*args, obj=True)
@classmethod
def __call__(cls, *args, **kwargs):
if args:
value = posixpath.expandvars(
posixpath.expanduser(
posixpath.join(*args)))
else:
value = str()
if value and 'obj' in kwargs or 'object' in kwargs:
value = pathlib.Path(value)
return value
[docs]class Pipe(object):
"""A pipe object. Represent a set of one or more commands."""
_chut = None
_pipe = True
_cmd_args = []
_sys_stdout = sys.stdout
_sys_stderr = sys.stderr
def __init__(self, *args, **kwargs):
self._done = False
self._stdout = None
self._stderr = None
self.args = list(args)
self.previous = None
self.processes = []
encoding = kwargs.get('encoding')
if not encoding:
encoding = getattr(sys.stdout, 'encoding', None) or 'utf8'
self.encoding = encoding
self.kwargs = kwargs
if 'sh' in kwargs:
kwargs['shell'] = kwargs.pop('sh')
if kwargs.get('stderr') == 1:
kwargs['stderr'] = STDOUT
if 'combine_stderr' in kwargs:
kwargs.pop('combine_stderr')
kwargs['stderr'] = STDOUT
if 'pipe' in kwargs:
if not kwargs.pop('pipe'):
self._call_pipe()
elif not self._pipe:
self._call_pipe()
def _call_pipe(self):
self._done = True
ret = self.__call__()
if ret.failed:
print(ret.stderr, file=sys.stderr)
@property
def returncodes(self):
"""A list of return codes of all processes launched by the pipe"""
for p in self.processes:
p.wait()
codes = [p.poll() for p in self.processes]
if set(codes) == set([0]):
return []
return codes
@property
def failed(self):
"""True if one or more process failed"""
output = self.__call__()
return output.failed
@property
def succeeded(self):
"""True if all processes succeeded"""
output = self.__call__()
if output.succeeded:
return output or True
return False
@property
def stderr(self):
"""combined stderr of all processes"""
if self._stderr is None:
stderr = [p.stderr.read() for p in self.processes if p.stderr]
output = b'\n'.join(stderr).strip()
if not isinstance(output, str):
output = output.decode(self.encoding, 'ignore')
self._stderr = output
return self._stderr
@property
def commands(self):
cmds = [self]
previous = self.previous
while previous is not None:
cmds.insert(0, previous)
previous = previous.previous
return cmds
def command_line(self, shell=False):
args = []
if self._cmd_args:
args.extend(self._cmd_args)
if 'sudo' in args:
args[0:1] = [aliases.get('sudo')]
binary = self._binary
if self._cmd_args[:1] == ['ssh']:
cmd = '%s %s' % (binary, ' '.join(self.args))
cmd = cmd.strip()
if shell and any(i in cmd for i in '\'"*<>|& '):
cmd = repr(str(cmd))
args.append(cmd)
else:
import shlex
args.extend(binary.split())
for a in self.args:
if isinstance(a, (list, tuple)):
args.extend(a)
elif shell:
args.append(a)
else:
args.extend(shlex.split(str(a)))
args = [a for a in args if a]
if shell:
return ' '.join(args)
return args
@property
def commands_line(self):
cmds = []
for cmd in self.commands:
if isinstance(cmd, Stdin):
s = 'stdin'
elif isinstance(cmd, PyPipe):
s = '%s()' % cmd.__class__.__name__
elif cmd.kwargs.get("shell"):
s = cmd.command_line(shell=True)
else:
args = []
for arg in cmd.command_line():
if any(i in arg for i in '\'"*<>|& '):
args.append(repr(str(arg)))
else:
args.append(arg)
s = " ".join(args)
cmds.append(s.strip())
return str(' | '.join(cmds))
[docs] def bg(self):
"""Run processes in background. Return the last piped Popen object"""
p = None
self.processes = []
self._stderr = None
stdin = sys.stdin
cmds = self.commands
if [c for c in cmds if c._cmd_args[:1] == ['sudo']]:
check_sudo()
for cmd in cmds:
if isinstance(cmd, Stdin):
stdin = cmd.iter_stdout
elif isinstance(cmd, PyPipe):
cmd.stdin = p.stdout
stdin = cmd.iter_stdout
p = cmd
else:
args = cmd.command_line(cmd.kwargs.get('shell', False))
kwargs = dict(
stdin=stdin, stderr=PIPE,
stdout=PIPE
)
kwargs.update(cmd.kwargs)
env_ = kwargs.pop('env', env)
log.debug('Popen(%r, **%r)', args, kwargs)
kwargs['env'] = env_
try:
p = Popen(args, **kwargs)
except OSError:
self._raise()
self.processes.append(p)
stdin = p.stdout
return p
def execv(self):
cmd = self.command_line()
binary = sh.which(cmd.pop(0))
if binary:
binary = str(binary)
os.execve(binary, [binary] + cmd, env)
else:
raise OSError(binary)
@property
def stdout(self):
"""standard output of the pipe. A file descriptor or an iteraror"""
p = self.bg()
if isinstance(p, PyPipe):
return p.iter_stdout
else:
return p.stdout
[docs] @classmethod
def map(cls, args,
pool_size=None, stop_on_failure=False, **kwargs):
"""Run a batch of the same command and manage a pool of processes for
you"""
kw = dict(
stdin=sys.stdin, stderr=PIPE,
stdout=PIPE
)
kw.update(kwargs)
if pool_size is None:
import multiprocessing
pool_size = multiprocessing.cpu_count()
results = [None] * len(args)
processes = []
index = 0
out_index = 0
while args or processes:
if args and len(processes) < pool_size:
a = args.pop(0)
if not isinstance(a, list):
a = [a]
cmd = cls(*a)
a = cmd.command_line(cmd.kwargs.get('shell', False))
processes.append((index, cmd, Popen(a, **kw)))
index += 1
for i, cmd, p in processes:
result = p.poll()
if result is not None:
output = Stdout(p.stdout.read())
output.stderr = p.stderr.read()
output.returncodes = [result]
output.failed = bool(result)
output.succeeded = not output.failed
results[i] = output
processes.remove((i, cmd, p))
if out_index == i:
out_index += 1
yield results[i]
if result > 0 and stop_on_failure:
args = None
for index, cmd, p in processes:
if p.poll() is None: # pragma: no cover
p.kill()
cmd._raise(output=output)
time.sleep(.1)
if out_index < len(results): # pragma: no cover
yield results[out_index]
out_index += 1
def __getitem__(self, item):
if not isinstance(item, slice):
raise KeyError('You can only use slices')
cmds = self.commands
cmds = [deepcopy(cmd) for cmd in cmds[item]]
return self._order(cmds)[-1]
def __getslice__(self, start, stop):
cmds = self.commands
cmds = [deepcopy(cmd) for cmd in cmds[start:stop]]
return self._order(cmds)[-1]
def __iter__(self):
eol = '\n'
for line in self.stdout:
yield self._decode(line).rstrip(eol)
if self.failed:
self._raise(output=self._get_stdout(''))
def __call__(self, **kwargs):
if self._done and self._stdout is not None:
return self._stdout
for cmd in self.commands:
if kwargs.get('shell'):
cmd.kwargs['shell'] = True
if kwargs.get('combine_stderr'):
cmd.kwargs['stderr'] = STDOUT
if kwargs.get('stderr'):
cmd.kwargs['stderr'] = STDOUT
stdout = self.stdout
if stdout is not None:
if hasattr(stdout, 'read'):
output = stdout.read().rstrip()
else: # pragma: no cover
output = b''.join(list(stdout)).rstrip()
output = self._decode(output)
else:
output = ''
output = self._get_stdout(output)
if self._done:
self._stdout = output
return output
def __str__(self):
output = self.__call__()
if self.failed:
self._raise(output=output)
return output
def __gt__(self, filename):
return self._write(filename, 'wb+')
def __rshift__(self, filename):
return self._write(filename, 'ab+')
def __or__(self, other):
if isinstance(other.commands, property):
other = other()
if isinstance(self, Stdin):
first = other.commands[0]
first.previous = self
return other
cmds = deepcopy(self.commands) + deepcopy(other.commands)
cmds = self._order(cmds)
other = cmds[-1]
return other
def __bool__(self):
return not self.failed
__nonzero__ = __bool__
def __deepcopy__(self, *args):
return self.__class__(*self.args, **self.kwargs)
def __repr__(self):
return repr(self.commands_line)
def _order(self, cmds):
if cmds:
cmds[0].previous = None
for i in range(len(cmds) - 1, 0, -1):
cmds[i].previous = cmds[i - 1]
return cmds
def _write_to(self, fd):
if not isinstance(self, PyPipe):
self.kwargs['stdout'] = fd
return self.__call__()
else:
for line in self.stdout:
fd.write(line)
return self._get_stdout('')
def _write(self, filename, mode):
if filename in (0,):
with open('/dev/null', 'ab') as fd:
output = self._write_to(fd)
elif filename in (1, 2):
if filename == 2:
fd = self._sys_stderr
else:
fd = self._sys_stdout
output = self._write_to(fd)
else:
with open(filename, mode) as fd:
output = self._write_to(fd)
if output.failed:
self._raise(output=output)
return output
def _decode(self, output):
if not isinstance(output, str):
output = output.decode(self.encoding)
return output
def _get_stdout(self, stdout):
if not isinstance(stdout, str):
stdout = stdout.encode(self.encoding)
output = Stdout(stdout)
output.stderr = self.stderr
output.returncodes = self.returncodes
output.failed = bool(output.returncodes)
output.succeeded = not output.failed
return output
def _raise(self, output=None):
if not log.handlers:
logging.basicConfig(stream=sys.stderr)
if output is not None:
if output.stderr:
log.error(output.stderr)
raise OSError(self.commands_line, output.stderr)
raise OSError(self.commands_line)
[docs]class Stdin(Pipe):
"""Used to inject some data in the pipe"""
stderr = ''
returncodes = []
def __init__(self, value):
super(Stdin, self).__init__()
self.value = value
self._stdin = None
@property
def iter_stdout(self):
if hasattr(self.value, 'seek'):
self.value.seek(0)
if hasattr(self.value, 'fileno'):
r = self.value
else:
if hasattr(self.value, 'read'):
value = self.value.read()
else:
if not isinstance(self.value, bytes):
value = self.value.encode('ascii')
else:
value = self.value
r, w = os.pipe()
fd = os.fdopen(w, 'wb')
fd.write(value)
fd.close()
return r
def __deepcopy__(self, *args):
return self.__class__(self.value)
def _write(self, filename, mode):
with open(filename, mode) as fd:
if hasattr(self.value, 'seek'):
self.value.seek(0)
if hasattr(self.value, 'read'):
shutil.copyfileobj(self.value, fd)
else:
fd.write(self.value)
return self._get_stdout('')
[docs]class Stdout(str):
"""A string with extra attributes:
- succeeded
- failed
- stdout
- stderr
"""
@property
def stdout(self):
return self
class PyPipe(Pipe):
@property
def iter_stdout(self):
return self.func(self.stdin)
def __deepcopy__(self, *args):
return sh.wraps(self.func)
class Base(object):
not_piped = [str(c) for c in __not_piped__]
def __init__(self, name, *cmd_args):
self.__name__ = name
self._cmds = {}
self._cmd_args = []
if cmd_args:
self._cmd_args = [name] + list(cmd_args)
def set_debug(self, enable=True):
if enable:
log.setLevel(logging.DEBUG)
if not log.handlers:
log.addHandler(logging.StreamHandler(sys.stdout))
else:
log.setLevel(logging.INFO)
def __getattr__(self, attr):
attr = str(attr)
if attr not in self._cmds:
kw = dict(_chut=self,
_binary=str(aliases.get(attr, attr)),
_cmd_args=self._cmd_args,
_pipe=True)
if attr in self.not_piped:
kw['_pipe'] = False
self._cmds[attr] = type(attr, (Pipe,), kw)
return self._cmds[attr]
__getitem__ = __getattr__
def __repr__(self):
return '<%s>' % self.__name__
class Chut(Base):
path = Path()
def wraps(self, func):
return type(func.__name__, (PyPipe,), {'func': staticmethod(func)})()
@contextmanager
def pipes(self, cmd):
try:
yield cmd
finally:
if cmd.returncodes:
stderr = cmd.stderr
log.error('Error while running %r\n%s', cmd, stderr)
raise OSError(stderr)
def pipe(self, binary, *args, **kwargs):
pipe = getattr(self, str(binary))
return pipe(*args, **kwargs)
def cd(self, directory):
"""Change the current directory"""
return ChangeDir(directory)
def pwd(self):
"""return os.path.abspath(os.getcwd())"""
return os.path.abspath(os.getcwd())
def stdin(self, value):
return Stdin(value)
def ssh(self, *args):
return SSH('ssh', *args)
class ChangeDir:
"""Change to a new directory and keep a track of the previous directory in
order to restore it. This is meant to be used in a with statement."""
def __init__(self, dir):
self._dir = os.path.realpath(dir)
self._prevdir = env.pwd
os.chdir(self._dir)
env.pwd = self._dir
def __enter__(self):
return self._dir
def __exit__(self, type, value, traceback):
os.chdir(self._prevdir)
env.pwd = self._prevdir
class Command(Base):
"""A command (like test)"""
def __getattr__(self, attr):
attr = str(attr)
if attr not in self._cmds:
cmd = self.__name__
kw = dict(_chut=self,
_binary='',
_cmd_args=[cmd, '-' + attr],
_pipe=True)
self._cmds[attr] = type(str(cmd), (Pipe,), kw)
return self._cmds[attr]
class SSH(Base):
"""A ssh server"""
def join(self, *args):
p = posixpath.join(*args)
host = str(self)
quote = '"'
if isinstance(p, bytes):
host = host.encode('ascii')
quote = quote.encode('ascii')
return host + quote + escape(p) + quote
def cd(self, *args):
raise NotImplementedError('cd does not work with ssh')
def pwd(self):
raise NotImplementedError('pwd does not work with ssh')
@property
def host(self):
return self._cmd_args[-1]
def __str__(self):
return '%s:' % self.host
def __call__(self, *args, **kwargs):
cmds = []
for a in args:
if isinstance(a, Pipe):
cmds.append(a.commands_line)
else:
cmds.append(a)
srv = getattr(SSH(aliases.get('ssh'), *self._cmd_args[1:]), '')
return srv(*cmds, **kwargs)
class ModuleWrapper(types.ModuleType):
"""wrap chut and add extra attributes from classes"""
def __init__(self, mod, chut, name):
self.__name__ = name
for attr in ["__builtins__", "__doc__",
"__package__", "__file__"]:
setattr(self, attr, getattr(mod, attr, None))
self.__path__ = getattr(mod, '__path__', [])
self.__test__ = getattr(mod, '__test__', {})
self.mod = mod
self.chut = chut
def __getattr__(self, attr):
if attr == '__wrapped__':
raise AttributeError()
if attr == '__all__':
if self.__name__ == 'chut':
return [str(c) for c in __all__]
else: # pragma: no cover
raise ImportError('You cant import things that does not exist')
if getattr(self.mod, attr, None) is not None:
return getattr(self.mod, attr)
else:
return getattr(self.chut, attr)
__getitem__ = __getattr__
logopts = Log()
info = logopts.log('info')
debug = logopts.log('debug')
error = logopts.log('error')
warn = logopts.log('warn')
exc = logopts.log('exception')
env = Environ(os.environ.copy())
sh = Chut('sh')
sudo = Chut('sudo', '-s')
test = Command('test')
e = escape
def wraps_module(mod):
sys.modules['chut'] = ModuleWrapper(mod, sh, 'chut')
sys.modules['chut.sudo'] = ModuleWrapper(mod, sudo, 'sudo')
#####################
# Script generation #
#####################
def requires(*requirements, **kwargs):
"""Add extra dependencies in a virtualenv"""
if '/.tox/' in sys.executable:
venv = os.path.dirname(os.path.dirname(sys.executable))
elif env.virtual_env: # pragma: no cover
venv = env.chut_virtualenv = env.virtual_env
else: # pragma: no cover
venv = os.path.expanduser(kwargs.get('venv', '~/.chut/venv'))
if not env.pip_download_cache: # pragma: no cover
env.pip_download_cache = os.path.expanduser('~/.chut/cache')
sh.mkdir('-p', env.pip_download_cache)
bin_dir = os.path.join(venv, 'bin')
if bin_dir not in env.path: # pragma: no cover
env.path = [bin_dir] + env.path
requirements = list(requirements)
if 'chut' not in requirements:
requirements.insert(0, 'chut')
if not test.d(venv): # pragma: no cover
import urllib
url = 'https://raw.github.com/pypa/virtualenv/master/virtualenv.py'
urllib.urlretrieve(url, '/tmp/_virtualenv.py')
sh[sys.executable]('-S /tmp/_virtualenv.py', venv) > 1
sh.rm('/tmp/_virtualenv*', shell=True)
info('Installing %s...' % ', '.join(requirements))
sh.pip('install -qM', *requirements) > 1
elif env.chut_virtualenv:
upgrade = '--upgrade' in sys.argv
if (env.chut_upgrade or upgrade): # pragma: no cover
installed = ''
else:
installed = str(sh.pip('freeze')).lower()
requirements = [r for r in requirements if r.lower() not in installed]
if requirements: # pragma: no cover
info('Updating %s...' % ', '.join(requirements))
sh.pip('install -qM --upgrade', *requirements) > 1
executable = os.path.join(bin_dir, 'python')
if not env.chut_virtualenv: # pragma: no cover
env.chut_virtualenv = venv
os.execve(executable, [executable] + sys.argv, env)
class console_script(object):
"""A decorator to take care of sys.argv via docopt"""
options = (
('-q, --quiet', 'Quiet (No output)'),
('--debug', 'Debug mode (More output / pdb on failure)'),
('-v, --version', 'Show version'),
('-h, --help', 'Show this help'),
)
def __init__(self, *args, **opts):
self._console_script = True
self.logopts = opts.pop('logopts', {})
for k in ('fmt', 'stream'):
if k in opts:
self.logopts[k] = opts.pop(k)
self.docopts = opts
self.func = self.doc = None
self.wraps(args)
def version(self): # pragma: no cover
version = getattr(sys.modules['__main__'], '__version__', 'unknown')
py_version = sys.version.split(' ', 1)[0]
args = (self.func.__name__, version, py_version)
print('%s %s runing on python %s' % args)
def wraps(self, args):
if args:
self.func = args[0]
functools.wraps(self.func)(self)
if 'help' not in self.docopts:
self.docopts['help'] = True
if 'doc' not in self.docopts:
doc = getattr(self.func, '__doc__', None)
else:
doc = self.docopts.pop('doc')
if doc is None:
doc = 'Usage: %prog'
name = self.func.__name__.replace('_', '-')
doc = doc.replace('%prog', name).strip()
if '%options' in doc:
def options(match):
ll = match.groups()[-1]
if ll is not None:
fmt = '{0:<%s}{1}\n' % ll.strip('-s')
else:
fmt = '{0:<20}{1}\n'
opts = ''
for opt in self.options:
opts += fmt.format(*opt)
return opts
doc = re.sub('(%options)(-[0-9]+s)*', options, doc)
doc = doc.replace('\n ', '\n')
self.doc = doc
if 'name' not in self.logopts:
self.logopts['name'] = name
return self
def main(self, arguments=None):
import docopt
ret = isinstance(arguments, list)
if ret:
self.docopts['argv'] = arguments
arguments = docopt.docopt(self.doc, **self.docopts)
if arguments.get('--version') is True: # pragma: no cover
res = self.version()
else:
logopts(arguments, **self.logopts)
try:
res = self.func(arguments)
except KeyboardInterrupt: # pragma: no cover
sys.exit(1)
except Exception: # pragma: no cover
if arguments.get('--debug'):
info(('> Entering python debuger. '
'Use h for help, q to quit.'))
import pdb
pdb.post_mortem()
return 1
else:
raise
return res if ret else sys.exit(res)
def __call__(self, *args, **kwargs):
return self.main(*args, **kwargs) if self.func else self.wraps(args)
Chut.console_script = console_script
class Generator(object):
"""generate a script from a @console_script. args may contain some
docopts like arguments"""
_modules = {}
def __init__(self, **args):
self.version = args.get('--new-version') or args.get('version')
self.devel = args.get('--devel') or args.get('devel')
if self.devel:
dest = 'bin'
else:
dest = args.get('--destination') or args.get('destination')
dest = os.path.expanduser(dest or 'dist/scripts')
sh.mkdir('-p', dest)
self.dest = dest
args.update(version=repr(str(args.get('--version') or 'unknown')),
interpreter=args.get('--interpreter', 'python3'))
self.args = args
self.mods = self.encode_modules(*args.get('modules', []))
def encode_module(self, mod):
if not hasattr(mod, '__file__'):
mod = __import__(mod)
name = str(mod.__name__)
if name not in self._modules:
data = inspect.getsource(mod)
data = base64.encodestring(zlib.compress(data.encode('utf8')))
code = '_chut_modules.append((%r, %r))\n' % (name, data)
self._modules[name] = code
return self._modules[name]
def encode_modules(self, *modules):
try:
# check if the script is already chutified
_chut_modules = sys.modules['__main__']._chut_modules
except AttributeError:
# get source from files
modules = [
'six', 'pathlib', 'docopt', 'ConfigObject',
sys.modules[__name__]
] + list(modules)
modules = ''.join([self.encode_module(m) for m in modules])
else: # pragma: no cover
# get source from _chut_modules
modules = ''
for name, data in _chut_modules:
modules += '_chut_modules.append((%r, %r))\n' % (name, data)
return modules
def generate(self, filename, args=None, **kwargs):
if args is None:
args = {}
args.update(kwargs)
dirname = os.path.dirname(filename)
mod_name = inspect.getmodulename(filename)
os.environ.update(env)
console_scripts = list(sh.grep('-A4 -E @.*console_script', filename))
console_scripts = [s for s in console_scripts if s.startswith('def ')]
scripts = []
loop = '--loop' in sys.argv or '-l' in sys.argv
while not os.path.isfile(filename): # pragma: no cover
time.sleep(.1)
mtime = os.stat(filename)[stat.ST_MTIME]
if console_scripts and self.version:
version = sh.grep('-E ^__version__', filename)
if version:
version = str(version).split('=')[1].strip('\'" ')
if version != self.version:
info('bump %s version from %s to %s',
posixpath.basename(filename), version, self.version)
sh.sed((
'-i \'s/^__version__ =.*/__version__ = "%s"/\''
) % self.version, filename, shell=True) > 1
for func_name in sorted(set(console_scripts)):
name = func_name[4:].split('(')[0]
script = os.path.join(self.dest, name.replace('_', '-'))
if os.path.isfile(script) and loop: # pragma: no cover
smtime = os.stat(script)[stat.ST_MTIME]
if mtime <= smtime:
continue
with open(script, 'w') as fd:
fd.write(SCRIPT_HEADER % self.args + self.mods + LOAD_MODULES)
if self.devel:
fd.write('sys.path.insert(0, "%s")\n' % dirname)
fd.write('import %s\n' % mod_name)
fd.write('__version__ = getattr(%s, ' % mod_name)
fd.write('"__version__", "unknown") + "-dev"\n')
fd.write('if __name__ == "__main__":\n')
fd.write(' %s.%s()\n' % (mod_name, name))
else:
with open(filename) as mod:
fd.write(mod.read().replace('__main__',
'__chutified__'))
fd.write((
"\nif __name__ == '__main__':\n %s()\n"
) % name)
executable = sh.chmod('+x', script)
if executable:
info(executable.commands_line)
scripts.append(script)
else: # pragma: no cover
error('failed to generate %s' % script)
return scripts
def __call__(self, location):
scripts = []
if os.path.isfile(location):
scripts.extend(self.generate(location))
elif os.path.isdir(location):
filenames = sh.grep('-lRE --include=*.py @.*console_script',
location) | sh.grep('-v site-packages')
for filename in sorted(filenames):
scripts.extend(self.generate(filename))
return scripts
SCRIPT_HEADER = '''
#!/usr/bin/env %(interpreter)s
# This script is generated with chut. Do NOT edit this file.
# All your changes will be lost at the next generation.
version = %(version)s
import base64, json, types, zlib, sys, os
os.environ['CHUTIFIED'] = '1'
PY3 = sys.version_info[0] == 3
_chut_modules = []
'''.lstrip()
LOAD_MODULES = '''
for name, code in _chut_modules:
if PY3:
if isinstance(code, str):
code = code.encode('utf-8')
code = zlib.decompress(base64.decodebytes(code))
else:
name = bytes(name)
code = zlib.decompress(base64.decodestring(code))
mod = types.ModuleType(name)
globs = dict()
if PY3:
if isinstance(code, bytes):
code = code.decode('utf-8')
exec(code, globs)
else:
exec('exec code in globs')
mod.__dict__.update(globs)
if name == 'chut':
mod.wraps_module(mod)
else:
sys.modules[name] = mod
from chut import env
'''.lstrip()
##########
# Fabric #
##########
[docs]class Fab(object):
dirname = '.chutifab'
scripts = []
def _run(self, meth, script, *args, **kwargs): # pragma: no cover
if script not in self.scripts:
scripts = sorted(sh.ls('.chutifab'))
fabric.abort((
'No such script {0}. Available scripts are:\n\n- {1}'
).format(script, '\n- '.join(scripts)))
if HAS_FABRIC:
meth = getattr(fabric, meth)
with fabric.settings(fabric.hide('stdout', 'running')):
res = meth(('test -d ~/{0} || mkdir ~/{0} && chmod 700 ~/{0}; '
'echo $HOME/{0}').format(self.dirname))
remote = posixpath.join(res, script)
fabric.put('.chutifab/' + script, remote, mode=0o700,
use_sudo=bool(meth.__name__ == 'sudo'))
cmd = '{0} {1}'.format(remote, ' '.join(args))
res = meth(cmd, **kwargs)
return res
[docs] def chutifab(self, *args):
"""Generate chut scripts contained in location"""
ll = logging.getLogger(posixpath.basename(sys.argv[0]))
level = ll.level
ll.setLevel(logging.WARN)
if not args:
args = ['.']
for location in args:
Generator(destination='.chutifab')(location)
ll.setLevel(level)
self.scripts = sorted(sh.ls('.chutifab'))
return self.scripts
[docs] def run(self, script, *args, **kwargs):
"""Upload a script and run it. ``*args`` are used as command line
arguments. ``**kwargs`` are passed to `fabric`'s `run`"""
return self._run('run', script, *args, **kwargs)
[docs] def sudo(self, script, *args, **kwargs):
"""Upload a script and run it using sudo. ``*args`` are used as command
line arguments. ``**kwargs`` are passed to `fabric`'s `sudo`"""
return self._run('sudo', script, *args, **kwargs)
fab = Fab()
if __name__ != '__main__':
mod = sys.modules[__name__]
wraps_module(mod)