testing.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. import os
  2. import sys
  3. import shutil
  4. import tempfile
  5. import contextlib
  6. from ._compat import iteritems, PY2
  7. # If someone wants to vendor click, we want to ensure the
  8. # correct package is discovered. Ideally we could use a
  9. # relative import here but unfortunately Python does not
  10. # support that.
  11. clickpkg = sys.modules[__name__.rsplit('.', 1)[0]]
  12. if PY2:
  13. from cStringIO import StringIO
  14. else:
  15. import io
  16. from ._compat import _find_binary_reader
  17. class EchoingStdin(object):
  18. def __init__(self, input, output):
  19. self._input = input
  20. self._output = output
  21. def __getattr__(self, x):
  22. return getattr(self._input, x)
  23. def _echo(self, rv):
  24. self._output.write(rv)
  25. return rv
  26. def read(self, n=-1):
  27. return self._echo(self._input.read(n))
  28. def readline(self, n=-1):
  29. return self._echo(self._input.readline(n))
  30. def readlines(self):
  31. return [self._echo(x) for x in self._input.readlines()]
  32. def __iter__(self):
  33. return iter(self._echo(x) for x in self._input)
  34. def __repr__(self):
  35. return repr(self._input)
  36. def make_input_stream(input, charset):
  37. # Is already an input stream.
  38. if hasattr(input, 'read'):
  39. if PY2:
  40. return input
  41. rv = _find_binary_reader(input)
  42. if rv is not None:
  43. return rv
  44. raise TypeError('Could not find binary reader for input stream.')
  45. if input is None:
  46. input = b''
  47. elif not isinstance(input, bytes):
  48. input = input.encode(charset)
  49. if PY2:
  50. return StringIO(input)
  51. return io.BytesIO(input)
  52. class Result(object):
  53. """Holds the captured result of an invoked CLI script."""
  54. def __init__(self, runner, output_bytes, exit_code, exception,
  55. exc_info=None):
  56. #: The runner that created the result
  57. self.runner = runner
  58. #: The output as bytes.
  59. self.output_bytes = output_bytes
  60. #: The exit code as integer.
  61. self.exit_code = exit_code
  62. #: The exception that happend if one did.
  63. self.exception = exception
  64. #: The traceback
  65. self.exc_info = exc_info
  66. @property
  67. def output(self):
  68. """The output as unicode string."""
  69. return self.output_bytes.decode(self.runner.charset, 'replace') \
  70. .replace('\r\n', '\n')
  71. def __repr__(self):
  72. return '<Result %s>' % (
  73. self.exception and repr(self.exception) or 'okay',
  74. )
  75. class CliRunner(object):
  76. """The CLI runner provides functionality to invoke a Click command line
  77. script for unittesting purposes in a isolated environment. This only
  78. works in single-threaded systems without any concurrency as it changes the
  79. global interpreter state.
  80. :param charset: the character set for the input and output data. This is
  81. UTF-8 by default and should not be changed currently as
  82. the reporting to Click only works in Python 2 properly.
  83. :param env: a dictionary with environment variables for overriding.
  84. :param echo_stdin: if this is set to `True`, then reading from stdin writes
  85. to stdout. This is useful for showing examples in
  86. some circumstances. Note that regular prompts
  87. will automatically echo the input.
  88. """
  89. def __init__(self, charset=None, env=None, echo_stdin=False):
  90. if charset is None:
  91. charset = 'utf-8'
  92. self.charset = charset
  93. self.env = env or {}
  94. self.echo_stdin = echo_stdin
  95. def get_default_prog_name(self, cli):
  96. """Given a command object it will return the default program name
  97. for it. The default is the `name` attribute or ``"root"`` if not
  98. set.
  99. """
  100. return cli.name or 'root'
  101. def make_env(self, overrides=None):
  102. """Returns the environment overrides for invoking a script."""
  103. rv = dict(self.env)
  104. if overrides:
  105. rv.update(overrides)
  106. return rv
  107. @contextlib.contextmanager
  108. def isolation(self, input=None, env=None, color=False):
  109. """A context manager that sets up the isolation for invoking of a
  110. command line tool. This sets up stdin with the given input data
  111. and `os.environ` with the overrides from the given dictionary.
  112. This also rebinds some internals in Click to be mocked (like the
  113. prompt functionality).
  114. This is automatically done in the :meth:`invoke` method.
  115. .. versionadded:: 4.0
  116. The ``color`` parameter was added.
  117. :param input: the input stream to put into sys.stdin.
  118. :param env: the environment overrides as dictionary.
  119. :param color: whether the output should contain color codes. The
  120. application can still override this explicitly.
  121. """
  122. input = make_input_stream(input, self.charset)
  123. old_stdin = sys.stdin
  124. old_stdout = sys.stdout
  125. old_stderr = sys.stderr
  126. old_forced_width = clickpkg.formatting.FORCED_WIDTH
  127. clickpkg.formatting.FORCED_WIDTH = 80
  128. env = self.make_env(env)
  129. if PY2:
  130. sys.stdout = sys.stderr = bytes_output = StringIO()
  131. if self.echo_stdin:
  132. input = EchoingStdin(input, bytes_output)
  133. else:
  134. bytes_output = io.BytesIO()
  135. if self.echo_stdin:
  136. input = EchoingStdin(input, bytes_output)
  137. input = io.TextIOWrapper(input, encoding=self.charset)
  138. sys.stdout = sys.stderr = io.TextIOWrapper(
  139. bytes_output, encoding=self.charset)
  140. sys.stdin = input
  141. def visible_input(prompt=None):
  142. sys.stdout.write(prompt or '')
  143. val = input.readline().rstrip('\r\n')
  144. sys.stdout.write(val + '\n')
  145. sys.stdout.flush()
  146. return val
  147. def hidden_input(prompt=None):
  148. sys.stdout.write((prompt or '') + '\n')
  149. sys.stdout.flush()
  150. return input.readline().rstrip('\r\n')
  151. def _getchar(echo):
  152. char = sys.stdin.read(1)
  153. if echo:
  154. sys.stdout.write(char)
  155. sys.stdout.flush()
  156. return char
  157. default_color = color
  158. def should_strip_ansi(stream=None, color=None):
  159. if color is None:
  160. return not default_color
  161. return not color
  162. old_visible_prompt_func = clickpkg.termui.visible_prompt_func
  163. old_hidden_prompt_func = clickpkg.termui.hidden_prompt_func
  164. old__getchar_func = clickpkg.termui._getchar
  165. old_should_strip_ansi = clickpkg.utils.should_strip_ansi
  166. clickpkg.termui.visible_prompt_func = visible_input
  167. clickpkg.termui.hidden_prompt_func = hidden_input
  168. clickpkg.termui._getchar = _getchar
  169. clickpkg.utils.should_strip_ansi = should_strip_ansi
  170. old_env = {}
  171. try:
  172. for key, value in iteritems(env):
  173. old_env[key] = os.environ.get(key)
  174. if value is None:
  175. try:
  176. del os.environ[key]
  177. except Exception:
  178. pass
  179. else:
  180. os.environ[key] = value
  181. yield bytes_output
  182. finally:
  183. for key, value in iteritems(old_env):
  184. if value is None:
  185. try:
  186. del os.environ[key]
  187. except Exception:
  188. pass
  189. else:
  190. os.environ[key] = value
  191. sys.stdout = old_stdout
  192. sys.stderr = old_stderr
  193. sys.stdin = old_stdin
  194. clickpkg.termui.visible_prompt_func = old_visible_prompt_func
  195. clickpkg.termui.hidden_prompt_func = old_hidden_prompt_func
  196. clickpkg.termui._getchar = old__getchar_func
  197. clickpkg.utils.should_strip_ansi = old_should_strip_ansi
  198. clickpkg.formatting.FORCED_WIDTH = old_forced_width
  199. def invoke(self, cli, args=None, input=None, env=None,
  200. catch_exceptions=True, color=False, **extra):
  201. """Invokes a command in an isolated environment. The arguments are
  202. forwarded directly to the command line script, the `extra` keyword
  203. arguments are passed to the :meth:`~clickpkg.Command.main` function of
  204. the command.
  205. This returns a :class:`Result` object.
  206. .. versionadded:: 3.0
  207. The ``catch_exceptions`` parameter was added.
  208. .. versionchanged:: 3.0
  209. The result object now has an `exc_info` attribute with the
  210. traceback if available.
  211. .. versionadded:: 4.0
  212. The ``color`` parameter was added.
  213. :param cli: the command to invoke
  214. :param args: the arguments to invoke
  215. :param input: the input data for `sys.stdin`.
  216. :param env: the environment overrides.
  217. :param catch_exceptions: Whether to catch any other exceptions than
  218. ``SystemExit``.
  219. :param extra: the keyword arguments to pass to :meth:`main`.
  220. :param color: whether the output should contain color codes. The
  221. application can still override this explicitly.
  222. """
  223. exc_info = None
  224. with self.isolation(input=input, env=env, color=color) as out:
  225. exception = None
  226. exit_code = 0
  227. try:
  228. cli.main(args=args or (),
  229. prog_name=self.get_default_prog_name(cli), **extra)
  230. except SystemExit as e:
  231. if e.code != 0:
  232. exception = e
  233. exc_info = sys.exc_info()
  234. exit_code = e.code
  235. if not isinstance(exit_code, int):
  236. sys.stdout.write(str(exit_code))
  237. sys.stdout.write('\n')
  238. exit_code = 1
  239. except Exception as e:
  240. if not catch_exceptions:
  241. raise
  242. exception = e
  243. exit_code = -1
  244. exc_info = sys.exc_info()
  245. finally:
  246. sys.stdout.flush()
  247. output = out.getvalue()
  248. return Result(runner=self,
  249. output_bytes=output,
  250. exit_code=exit_code,
  251. exception=exception,
  252. exc_info=exc_info)
  253. @contextlib.contextmanager
  254. def isolated_filesystem(self):
  255. """A context manager that creates a temporary folder and changes
  256. the current working directory to it for isolated filesystem tests.
  257. """
  258. cwd = os.getcwd()
  259. t = tempfile.mkdtemp()
  260. os.chdir(t)
  261. try:
  262. yield t
  263. finally:
  264. os.chdir(cwd)
  265. try:
  266. shutil.rmtree(t)
  267. except (OSError, IOError):
  268. pass