Viewing file:
test_runner.py (29.31 KB) -rw-r--r--Select action/file-type:

(
+) |

(
+) |

(
+) |
Code (
+) |
Session (
+) |

(
+) |
SDB (
+) |

(
+) |

(
+) |

(
+) |

(
+) |

(
+) |
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
#
# Maintainer: Jonathan Lange
# Author: Robert Collins
import StringIO, os, sys
from zope.interface import implements
from zope.interface.verify import verifyObject
from twisted.trial.itrial import IReporter, ITestCase
from twisted.trial import unittest, runner, reporter, util
from twisted.python import failure, log, reflect, filepath
from twisted.python.filepath import FilePath
from twisted.scripts import trial
from twisted.plugins import twisted_trial
from twisted import plugin
from twisted.internet import defer
pyunit = __import__('unittest')
class CapturingDebugger(object):
def __init__(self):
self._calls = []
def runcall(self, *args, **kwargs):
self._calls.append('runcall')
args[0](*args[1:], **kwargs)
class CapturingReporter(object):
"""
Reporter that keeps a log of all actions performed on it.
"""
implements(IReporter)
stream = None
tbformat = None
args = None
separator = None
testsRun = None
def __init__(self, stream=None, tbformat=None, rterrors=None,
publisher=None):
"""
Create a capturing reporter.
"""
self._calls = []
self.shouldStop = False
self._stream = stream
self._tbformat = tbformat
self._rterrors = rterrors
self._publisher = publisher
def startTest(self, method):
"""
Report the beginning of a run of a single test method
@param method: an object that is adaptable to ITestMethod
"""
self._calls.append('startTest')
def stopTest(self, method):
"""
Report the status of a single test method
@param method: an object that is adaptable to ITestMethod
"""
self._calls.append('stopTest')
def cleanupErrors(self, errs):
"""called when the reactor has been left in a 'dirty' state
@param errs: a list of L{twisted.python.failure.Failure}s
"""
self._calls.append('cleanupError')
def addSuccess(self, test):
self._calls.append('addSuccess')
def done(self):
"""
Do nothing. These tests don't care about done.
"""
class TrialRunnerTestsMixin:
"""
Mixin defining tests for L{runner.TrialRunner}.
"""
def tearDown(self):
self.runner._tearDownLogFile()
def test_empty(self):
"""
Empty test method, used by the other tests.
"""
def _getObservers(self):
return log.theLogPublisher.observers
def test_addObservers(self):
"""
Any log system observers L{TrialRunner.run} adds are removed by the
time it returns.
"""
originalCount = len(self._getObservers())
self.runner.run(self.test)
newCount = len(self._getObservers())
self.assertEqual(newCount, originalCount)
def test_logFileAlwaysActive(self):
"""
Test that a new file is opened on each run.
"""
oldSetUpLogFile = self.runner._setUpLogFile
l = []
def setUpLogFile():
oldSetUpLogFile()
l.append(self.runner._logFileObserver)
self.runner._setUpLogFile = setUpLogFile
self.runner.run(self.test)
self.runner.run(self.test)
self.failUnlessEqual(len(l), 2)
self.failIf(l[0] is l[1], "Should have created a new file observer")
def test_logFileGetsClosed(self):
"""
Test that file created is closed during the run.
"""
oldSetUpLogFile = self.runner._setUpLogFile
l = []
def setUpLogFile():
oldSetUpLogFile()
l.append(self.runner._logFileObject)
self.runner._setUpLogFile = setUpLogFile
self.runner.run(self.test)
self.failUnlessEqual(len(l), 1)
self.failUnless(l[0].closed)
class TestTrialRunner(TrialRunnerTestsMixin, unittest.TestCase):
"""
Tests for L{runner.TrialRunner} with the feature to turn unclean errors
into warnings disabled.
"""
def setUp(self):
self.stream = StringIO.StringIO()
self.runner = runner.TrialRunner(CapturingReporter, stream=self.stream)
self.test = TestTrialRunner('test_empty')
def test_publisher(self):
"""
The reporter constructed by L{runner.TrialRunner} is passed
L{twisted.python.log} as the value for the C{publisher} parameter.
"""
result = self.runner._makeResult()
self.assertIdentical(result._publisher, log)
class TrialRunnerWithUncleanWarningsReporter(TrialRunnerTestsMixin,
unittest.TestCase):
"""
Tests for the TrialRunner's interaction with an unclean-error suppressing
reporter.
"""
def setUp(self):
self.stream = StringIO.StringIO()
self.runner = runner.TrialRunner(CapturingReporter, stream=self.stream,
uncleanWarnings=True)
self.test = TestTrialRunner('test_empty')
class DryRunMixin(object):
suppress = [util.suppress(
category=DeprecationWarning,
message="Test visitors deprecated in Twisted 8.0")]
def setUp(self):
self.log = []
self.stream = StringIO.StringIO()
self.runner = runner.TrialRunner(CapturingReporter,
runner.TrialRunner.DRY_RUN,
stream=self.stream)
self.makeTestFixtures()
def makeTestFixtures(self):
"""
Set C{self.test} and C{self.suite}, where C{self.suite} is an empty
TestSuite.
"""
def test_empty(self):
"""
If there are no tests, the reporter should not receive any events to
report.
"""
result = self.runner.run(runner.TestSuite())
self.assertEqual(result._calls, [])
def test_singleCaseReporting(self):
"""
If we are running a single test, check the reporter starts, passes and
then stops the test during a dry run.
"""
result = self.runner.run(self.test)
self.assertEqual(result._calls, ['startTest', 'addSuccess', 'stopTest'])
def test_testsNotRun(self):
"""
When we are doing a dry run, the tests should not actually be run.
"""
self.runner.run(self.test)
self.assertEqual(self.log, [])
class DryRunTest(DryRunMixin, unittest.TestCase):
"""
Check that 'dry run' mode works well with Trial tests.
"""
def makeTestFixtures(self):
class MockTest(unittest.TestCase):
def test_foo(test):
self.log.append('test_foo')
self.test = MockTest('test_foo')
self.suite = runner.TestSuite()
class PyUnitDryRunTest(DryRunMixin, unittest.TestCase):
"""
Check that 'dry run' mode works well with stdlib unittest tests.
"""
def makeTestFixtures(self):
class PyunitCase(pyunit.TestCase):
def test_foo(self):
pass
self.test = PyunitCase('test_foo')
self.suite = pyunit.TestSuite()
class TestRunner(unittest.TestCase):
def setUp(self):
self.config = trial.Options()
# whitebox hack a reporter in, because plugins are CACHED and will
# only reload if the FILE gets changed.
parts = reflect.qual(CapturingReporter).split('.')
package = '.'.join(parts[:-1])
klass = parts[-1]
plugins = [twisted_trial._Reporter(
"Test Helper Reporter",
package,
description="Utility for unit testing.",
longOpt="capturing",
shortOpt=None,
klass=klass)]
# XXX There should really be a general way to hook the plugin system
# for tests.
def getPlugins(iface, *a, **kw):
self.assertEqual(iface, IReporter)
return plugins + list(self.original(iface, *a, **kw))
self.original = plugin.getPlugins
plugin.getPlugins = getPlugins
self.standardReport = ['startTest', 'addSuccess', 'stopTest',
'startTest', 'addSuccess', 'stopTest',
'startTest', 'addSuccess', 'stopTest',
'startTest', 'addSuccess', 'stopTest',
'startTest', 'addSuccess', 'stopTest',
'startTest', 'addSuccess', 'stopTest',
'startTest', 'addSuccess', 'stopTest',
'startTest', 'addSuccess', 'stopTest',
'startTest', 'addSuccess', 'stopTest',
'startTest', 'addSuccess', 'stopTest']
def tearDown(self):
plugin.getPlugins = self.original
def parseOptions(self, args):
self.config.parseOptions(args)
def getRunner(self):
r = trial._makeRunner(self.config)
r.stream = StringIO.StringIO()
# XXX The runner should always take care of cleaning this up itself.
# It's not clear why this is necessary. The runner always tears down
# its log file.
self.addCleanup(r._tearDownLogFile)
# XXX The runner should always take care of cleaning this up itself as
# well. It's necessary because TrialRunner._setUpTestdir might raise
# an exception preventing Reporter.done from being run, leaving the
# observer added by Reporter.__init__ still present in the system.
# Something better needs to happen inside
# TrialRunner._runWithoutDecoration to remove the need for this cludge.
r._log = log.LogPublisher()
return r
def test_runner_can_get_reporter(self):
self.parseOptions([])
result = self.config['reporter']
runner = self.getRunner()
self.assertEqual(result, runner._makeResult().__class__)
def test_runner_get_result(self):
self.parseOptions([])
runner = self.getRunner()
result = runner._makeResult()
self.assertEqual(result.__class__, self.config['reporter'])
def test_uncleanWarningsOffByDefault(self):
"""
By default Trial sets the 'uncleanWarnings' option on the runner to
False. This means that dirty reactor errors will be reported as
errors. See L{test_reporter.TestDirtyReactor}.
"""
self.parseOptions([])
runner = self.getRunner()
self.assertNotIsInstance(runner._makeResult(),
reporter.UncleanWarningsReporterWrapper)
def test_getsUncleanWarnings(self):
"""
Specifying '--unclean-warnings' on the trial command line will cause
reporters to be wrapped in a device which converts unclean errors to
warnings. See L{test_reporter.TestDirtyReactor} for implications.
"""
self.parseOptions(['--unclean-warnings'])
runner = self.getRunner()
self.assertIsInstance(runner._makeResult(),
reporter.UncleanWarningsReporterWrapper)
def test_runner_working_directory(self):
self.parseOptions(['--temp-directory', 'some_path'])
runner = self.getRunner()
self.assertEquals(runner.workingDirectory, 'some_path')
def test_concurrentImplicitWorkingDirectory(self):
"""
If no working directory is explicitly specified and the default
working directory is in use by another runner, L{TrialRunner.run}
selects a different default working directory to use.
"""
self.parseOptions([])
# Make sure we end up with the same working directory after this test
# as we had before it.
self.addCleanup(os.chdir, os.getcwd())
# Make a new directory and change into it. This isolates us from state
# that other tests might have dumped into this process's temp
# directory.
runDirectory = FilePath(self.mktemp())
runDirectory.makedirs()
os.chdir(runDirectory.path)
firstRunner = self.getRunner()
secondRunner = self.getRunner()
where = {}
class ConcurrentCase(unittest.TestCase):
def test_first(self):
"""
Start a second test run which will have a default working
directory which is the same as the working directory of the
test run already in progress.
"""
# Change the working directory to the value it had before this
# test suite was started.
where['concurrent'] = subsequentDirectory = os.getcwd()
os.chdir(runDirectory.path)
self.addCleanup(os.chdir, subsequentDirectory)
secondRunner.run(ConcurrentCase('test_second'))
def test_second(self):
"""
Record the working directory for later analysis.
"""
where['record'] = os.getcwd()
result = firstRunner.run(ConcurrentCase('test_first'))
bad = result.errors + result.failures
if bad:
self.fail(bad[0][1])
self.assertEqual(
where, {
'concurrent': runDirectory.child('_trial_temp').path,
'record': runDirectory.child('_trial_temp-1').path})
def test_concurrentExplicitWorkingDirectory(self):
"""
If a working directory which is already in use is explicitly specified,
L{TrialRunner.run} raises L{_WorkingDirectoryBusy}.
"""
self.parseOptions(['--temp-directory', os.path.abspath(self.mktemp())])
initialDirectory = os.getcwd()
self.addCleanup(os.chdir, initialDirectory)
firstRunner = self.getRunner()
secondRunner = self.getRunner()
class ConcurrentCase(unittest.TestCase):
def test_concurrent(self):
"""
Try to start another runner in the same working directory and
assert that it raises L{_WorkingDirectoryBusy}.
"""
self.assertRaises(
util._WorkingDirectoryBusy,
secondRunner.run, ConcurrentCase('test_failure'))
def test_failure(self):
"""
Should not be called, always fails.
"""
self.fail("test_failure should never be called.")
result = firstRunner.run(ConcurrentCase('test_concurrent'))
bad = result.errors + result.failures
if bad:
self.fail(bad[0][1])
def test_runner_normal(self):
self.parseOptions(['--temp-directory', self.mktemp(),
'--reporter', 'capturing',
'twisted.trial.test.sample'])
my_runner = self.getRunner()
loader = runner.TestLoader()
suite = loader.loadByName('twisted.trial.test.sample', True)
result = my_runner.run(suite)
self.assertEqual(self.standardReport, result._calls)
def test_runner_debug(self):
self.parseOptions(['--reporter', 'capturing',
'--debug', 'twisted.trial.test.sample'])
my_runner = self.getRunner()
debugger = CapturingDebugger()
def get_debugger():
return debugger
my_runner._getDebugger = get_debugger
loader = runner.TestLoader()
suite = loader.loadByName('twisted.trial.test.sample', True)
result = my_runner.run(suite)
self.assertEqual(self.standardReport, result._calls)
self.assertEqual(['runcall'], debugger._calls)
class RemoveSafelyTests(unittest.TestCase):
"""
Tests for L{_removeSafely}.
"""
def test_removeSafelyNoTrialMarker(self):
"""
If a path doesn't contain a node named C{"_trial_marker"}, that path is
not removed by L{runner._removeSafely} and a L{runner._NoTrialMarker}
exception is raised instead.
"""
directory = self.mktemp()
os.mkdir(directory)
dirPath = filepath.FilePath(directory)
self.assertRaises(util._NoTrialMarker, util._removeSafely, dirPath)
def test_removeSafelyRemoveFailsMoveSucceeds(self):
"""
If an L{OSError} is raised while removing a path in
L{runner._removeSafely}, an attempt is made to move the path to a new
name.
"""
def dummyRemove():
"""
Raise an C{OSError} to emulate the branch of L{runner._removeSafely}
in which path removal fails.
"""
raise OSError()
# Patch stdout so we can check the print statements in _removeSafely
out = StringIO.StringIO()
self.patch(sys, 'stdout', out)
# Set up a trial directory with a _trial_marker
directory = self.mktemp()
os.mkdir(directory)
dirPath = filepath.FilePath(directory)
dirPath.child('_trial_marker').touch()
# Ensure that path.remove() raises an OSError
dirPath.remove = dummyRemove
util._removeSafely(dirPath)
self.assertIn("could not remove FilePath", out.getvalue())
def test_removeSafelyRemoveFailsMoveFails(self):
"""
If an L{OSError} is raised while removing a path in
L{runner._removeSafely}, an attempt is made to move the path to a new
name. If that attempt fails, the L{OSError} is re-raised.
"""
def dummyRemove():
"""
Raise an C{OSError} to emulate the branch of L{runner._removeSafely}
in which path removal fails.
"""
raise OSError("path removal failed")
def dummyMoveTo(path):
"""
Raise an C{OSError} to emulate the branch of L{runner._removeSafely}
in which path movement fails.
"""
raise OSError("path movement failed")
# Patch stdout so we can check the print statements in _removeSafely
out = StringIO.StringIO()
self.patch(sys, 'stdout', out)
# Set up a trial directory with a _trial_marker
directory = self.mktemp()
os.mkdir(directory)
dirPath = filepath.FilePath(directory)
dirPath.child('_trial_marker').touch()
# Ensure that path.remove() and path.moveTo() both raise OSErrors
dirPath.remove = dummyRemove
dirPath.moveTo = dummyMoveTo
error = self.assertRaises(OSError, util._removeSafely, dirPath)
self.assertEquals(str(error), "path movement failed")
self.assertIn("could not remove FilePath", out.getvalue())
class TestTrialSuite(unittest.TestCase):
def test_imports(self):
# FIXME, HTF do you test the reactor can be cleaned up ?!!!
from twisted.trial.runner import TrialSuite
class TestUntilFailure(unittest.TestCase):
class FailAfter(unittest.TestCase):
"""
A test case that fails when run 3 times in a row.
"""
count = []
def test_foo(self):
self.count.append(None)
if len(self.count) == 3:
self.fail('Count reached 3')
def setUp(self):
TestUntilFailure.FailAfter.count = []
self.test = TestUntilFailure.FailAfter('test_foo')
self.stream = StringIO.StringIO()
self.runner = runner.TrialRunner(reporter.Reporter, stream=self.stream)
def test_runUntilFailure(self):
"""
Test that the runUntilFailure method of the runner actually fail after
a few runs.
"""
result = self.runner.runUntilFailure(self.test)
self.failUnlessEqual(result.testsRun, 1)
self.failIf(result.wasSuccessful())
self.assertEquals(self._getFailures(result), 1)
def _getFailures(self, result):
"""
Get the number of failures that were reported to a result.
"""
return len(result.failures)
def test_runUntilFailureDecorate(self):
"""
C{runUntilFailure} doesn't decorate the tests uselessly: it does it one
time when run starts, but not at each turn.
"""
decorated = []
def decorate(test, interface):
decorated.append((test, interface))
return test
self.patch(unittest, "decorate", decorate)
result = self.runner.runUntilFailure(self.test)
self.failUnlessEqual(result.testsRun, 1)
self.assertEquals(len(decorated), 1)
self.assertEquals(decorated, [(self.test, ITestCase)])
def test_runUntilFailureForceGCDecorate(self):
"""
C{runUntilFailure} applies the force-gc decoration after the standard
L{ITestCase} decoration, but only one time.
"""
decorated = []
def decorate(test, interface):
decorated.append((test, interface))
return test
self.patch(unittest, "decorate", decorate)
self.runner._forceGarbageCollection = True
result = self.runner.runUntilFailure(self.test)
self.failUnlessEqual(result.testsRun, 1)
self.assertEquals(len(decorated), 2)
self.assertEquals(decorated,
[(self.test, ITestCase),
(self.test, unittest._ForceGarbageCollectionDecorator)])
class UncleanUntilFailureTests(TestUntilFailure):
"""
Test that the run-until-failure feature works correctly with the unclean
error suppressor.
"""
def setUp(self):
TestUntilFailure.setUp(self)
self.runner = runner.TrialRunner(reporter.Reporter, stream=self.stream,
uncleanWarnings=True)
def _getFailures(self, result):
"""
Get the number of failures that were reported to a result that
is wrapped in an UncleanFailureWrapper.
"""
return len(result._originalReporter.failures)
class BreakingSuite(runner.TestSuite):
"""
A L{TestSuite} that logs an error when it is run.
"""
def run(self, result):
try:
raise RuntimeError("error that occurs outside of a test")
except RuntimeError:
log.err(failure.Failure())
class TestLoggedErrors(unittest.TestCase):
"""
It is possible for an error generated by a test to be logged I{outside} of
any test. The log observers constructed by L{TestCase} won't catch these
errors. Here we try to generate such errors and ensure they are reported to
a L{TestResult} object.
"""
def tearDown(self):
self.flushLoggedErrors(RuntimeError)
def test_construct(self):
"""
Check that we can construct a L{runner.LoggedSuite} and that it
starts empty.
"""
suite = runner.LoggedSuite()
self.assertEqual(suite.countTestCases(), 0)
def test_capturesError(self):
"""
Chek that a L{LoggedSuite} reports any logged errors to its result.
"""
result = reporter.TestResult()
suite = runner.LoggedSuite([BreakingSuite()])
suite.run(result)
self.assertEqual(len(result.errors), 1)
self.assertEqual(result.errors[0][0].id(), runner.NOT_IN_TEST)
self.failUnless(result.errors[0][1].check(RuntimeError))
class TestTestHolder(unittest.TestCase):
def setUp(self):
self.description = "description"
self.holder = runner.TestHolder(self.description)
def test_holder(self):
"""
Check that L{runner.TestHolder} takes a description as a parameter
and that this description is returned by the C{id} and
C{shortDescription} methods.
"""
self.assertEqual(self.holder.id(), self.description)
self.assertEqual(self.holder.shortDescription(), self.description)
def test_holderImplementsITestCase(self):
"""
L{runner.TestHolder} implements L{ITestCase}.
"""
self.assertIdentical(self.holder, ITestCase(self.holder))
self.assertTrue(
verifyObject(ITestCase, self.holder),
"%r claims to provide %r but does not do so correctly."
% (self.holder, ITestCase))
def test_runsWithStandardResult(self):
"""
A L{runner.TestHolder} can run against the standard Python
C{TestResult}.
"""
result = pyunit.TestResult()
self.holder.run(result)
self.assertTrue(result.wasSuccessful())
self.assertEquals(1, result.testsRun)
class TestErrorHolder(TestTestHolder):
"""
Test L{runner.ErrorHolder} shares behaviour with L{runner.TestHolder}.
"""
def setUp(self):
self.description = "description"
# make a real Failure so we can construct ErrorHolder()
try:
1/0
except ZeroDivisionError:
error = failure.Failure()
self.holder = runner.ErrorHolder(self.description, error)
def test_runsWithStandardResult(self):
"""
A L{runner.ErrorHolder} can run against the standard Python
C{TestResult}.
"""
result = pyunit.TestResult()
self.holder.run(result)
self.assertFalse(result.wasSuccessful())
self.assertEquals(1, result.testsRun)
class TestMalformedMethod(unittest.TestCase):
"""
Test that trial manages when test methods don't have correct signatures.
"""
class ContainMalformed(unittest.TestCase):
"""
This TestCase holds malformed test methods that trial should handle.
"""
def test_foo(self, blah):
pass
def test_bar():
pass
test_spam = defer.deferredGenerator(test_bar)
def _test(self, method):
"""
Wrapper for one of the test method of L{ContainMalformed}.
"""
stream = StringIO.StringIO()
trialRunner = runner.TrialRunner(reporter.Reporter, stream=stream)
test = TestMalformedMethod.ContainMalformed(method)
result = trialRunner.run(test)
self.failUnlessEqual(result.testsRun, 1)
self.failIf(result.wasSuccessful())
self.failUnlessEqual(len(result.errors), 1)
def test_extraArg(self):
"""
Test when the method has extra (useless) arguments.
"""
self._test('test_foo')
def test_noArg(self):
"""
Test when the method doesn't have even self as argument.
"""
self._test('test_bar')
def test_decorated(self):
"""
Test a decorated method also fails.
"""
self._test('test_spam')
class DestructiveTestSuiteTestCase(unittest.TestCase):
"""
Test for L{runner.DestructiveTestSuite}.
"""
def test_basic(self):
"""
Thes destructive test suite should run the tests normally.
"""
called = []
class MockTest(unittest.TestCase):
def test_foo(test):
called.append(True)
test = MockTest('test_foo')
result = reporter.TestResult()
suite = runner.DestructiveTestSuite([test])
self.assertEquals(called, [])
suite.run(result)
self.assertEquals(called, [True])
self.assertEquals(suite.countTestCases(), 0)
def test_shouldStop(self):
"""
Test the C{shouldStop} management: raising a C{KeyboardInterrupt} must
interrupt the suite.
"""
called = []
class MockTest(unittest.TestCase):
def test_foo1(test):
called.append(1)
def test_foo2(test):
raise KeyboardInterrupt()
def test_foo3(test):
called.append(2)
result = reporter.TestResult()
loader = runner.TestLoader()
loader.suiteFactory = runner.DestructiveTestSuite
suite = loader.loadClass(MockTest)
self.assertEquals(called, [])
suite.run(result)
self.assertEquals(called, [1])
# The last test shouldn't have been run
self.assertEquals(suite.countTestCases(), 1)
def test_cleanup(self):
"""
Checks that the test suite cleanups its tests during the run, so that
it ends empty.
"""
class MockTest(unittest.TestCase):
def test_foo(test):
pass
test = MockTest('test_foo')
result = reporter.TestResult()
suite = runner.DestructiveTestSuite([test])
self.assertEquals(suite.countTestCases(), 1)
suite.run(result)
self.assertEquals(suite.countTestCases(), 0)
class TestRunnerDeprecation(unittest.TestCase):
class FakeReporter(reporter.Reporter):
"""
Fake reporter that does *not* implement done() but *does* implement
printErrors, separator, printSummary, stream, write and writeln
without deprecations.
"""
done = None
separator = None
stream = None
def printErrors(self, *args):
pass
def printSummary(self, *args):
pass
def write(self, *args):
pass
def writeln(self, *args):
pass
def test_reporterDeprecations(self):
"""
The runner emits a warning if it is using a result that doesn't
implement 'done'.
"""
trialRunner = runner.TrialRunner(None)
result = self.FakeReporter()
trialRunner._makeResult = lambda: result
def f():
# We have to use a pyunit test, otherwise we'll get deprecation
# warnings about using iterate() in a test.
trialRunner.run(pyunit.TestCase('id'))
self.assertWarns(
DeprecationWarning,
"%s should implement done() but doesn't. Falling back to "
"printErrors() and friends." % reflect.qual(result.__class__),
__file__, f)