Viewing file:
test_core.py (10.84 KB) -rw-r--r--Select action/file-type:

(
+) |

(
+) |

(
+) |
Code (
+) |
Session (
+) |

(
+) |
SDB (
+) |

(
+) |

(
+) |

(
+) |

(
+) |

(
+) |
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for implementations of L{IReactorCore}.
"""
__metaclass__ = type
import signal
import time
import inspect
from twisted.internet.abstract import FileDescriptor
from twisted.internet.error import ReactorAlreadyRunning, ReactorNotRestartable
from twisted.internet.defer import Deferred
from twisted.internet.test.reactormixins import ReactorBuilder
class ObjectModelIntegrationMixin(object):
"""
Helpers for tests about the object model of reactor-related objects.
"""
def assertFullyNewStyle(self, instance):
"""
Assert that the given object is an instance of a new-style class and
that there are no classic classes in the inheritance hierarchy of
that class.
This is a beneficial condition because PyPy is better able to
optimize attribute lookup on such classes.
"""
self.assertIsInstance(instance, object)
mro = inspect.getmro(type(instance))
for subclass in mro:
self.assertTrue(
issubclass(subclass, object),
"%r is not new-style" % (subclass,))
class ObjectModelIntegrationTest(ReactorBuilder, ObjectModelIntegrationMixin):
"""
Test details of object model integration against all reactors.
"""
def test_newstyleReactor(self):
"""
Checks that all reactors on a platform have method resolution order
containing only new style classes.
"""
reactor = self.buildReactor()
self.assertFullyNewStyle(reactor)
class SystemEventTestsBuilder(ReactorBuilder):
"""
Builder defining tests relating to L{IReactorCore.addSystemEventTrigger}
and L{IReactorCore.fireSystemEvent}.
"""
def test_stopWhenNotStarted(self):
"""
C{reactor.stop()} raises L{RuntimeError} when called when the reactor
has not been started.
"""
reactor = self.buildReactor()
self.assertRaises(RuntimeError, reactor.stop)
def test_stopWhenAlreadyStopped(self):
"""
C{reactor.stop()} raises L{RuntimeError} when called after the reactor
has been stopped.
"""
reactor = self.buildReactor()
reactor.callWhenRunning(reactor.stop)
reactor.run()
self.assertRaises(RuntimeError, reactor.stop)
def test_callWhenRunningOrder(self):
"""
Functions are run in the order that they were passed to
L{reactor.callWhenRunning}.
"""
reactor = self.buildReactor()
events = []
reactor.callWhenRunning(events.append, "first")
reactor.callWhenRunning(events.append, "second")
reactor.callWhenRunning(reactor.stop)
reactor.run()
self.assertEqual(events, ["first", "second"])
def test_runningForStartupEvents(self):
"""
The reactor is not running when C{"before"} C{"startup"} triggers are
called and is running when C{"during"} and C{"after"} C{"startup"}
triggers are called.
"""
reactor = self.buildReactor()
state = {}
def beforeStartup():
state['before'] = reactor.running
def duringStartup():
state['during'] = reactor.running
def afterStartup():
state['after'] = reactor.running
reactor.addSystemEventTrigger("before", "startup", beforeStartup)
reactor.addSystemEventTrigger("during", "startup", duringStartup)
reactor.addSystemEventTrigger("after", "startup", afterStartup)
reactor.callWhenRunning(reactor.stop)
self.assertEqual(state, {})
reactor.run()
self.assertEqual(
state,
{"before": False,
"during": True,
"after": True})
def test_signalHandlersInstalledDuringStartup(self):
"""
Signal handlers are installed in responsed to the C{"during"}
C{"startup"}.
"""
reactor = self.buildReactor()
phase = [None]
def beforeStartup():
phase[0] = "before"
def afterStartup():
phase[0] = "after"
reactor.addSystemEventTrigger("before", "startup", beforeStartup)
reactor.addSystemEventTrigger("after", "startup", afterStartup)
sawPhase = [None]
def fakeSignal(signum, action):
sawPhase[0] = phase[0]
self.patch(signal, 'signal', fakeSignal)
reactor.callWhenRunning(reactor.stop)
self.assertEqual(phase[0], None)
self.assertEqual(sawPhase[0], None)
reactor.run()
self.assertEqual(sawPhase[0], "before")
self.assertEqual(phase[0], "after")
def test_stopShutDownEvents(self):
"""
C{reactor.stop()} fires all three phases of shutdown event triggers
before it makes C{reactor.run()} return.
"""
reactor = self.buildReactor()
events = []
reactor.addSystemEventTrigger(
"before", "shutdown",
lambda: events.append(("before", "shutdown")))
reactor.addSystemEventTrigger(
"during", "shutdown",
lambda: events.append(("during", "shutdown")))
reactor.addSystemEventTrigger(
"after", "shutdown",
lambda: events.append(("after", "shutdown")))
reactor.callWhenRunning(reactor.stop)
reactor.run()
self.assertEquals(events, [("before", "shutdown"),
("during", "shutdown"),
("after", "shutdown")])
def test_shutdownFiresTriggersAsynchronously(self):
"""
C{"before"} C{"shutdown"} triggers are not run synchronously from
L{reactor.stop}.
"""
reactor = self.buildReactor()
events = []
reactor.addSystemEventTrigger(
"before", "shutdown", events.append, "before shutdown")
def stopIt():
reactor.stop()
events.append("stopped")
reactor.callWhenRunning(stopIt)
self.assertEqual(events, [])
reactor.run()
self.assertEqual(events, ["stopped", "before shutdown"])
def test_shutdownDisconnectsCleanly(self):
"""
A L{IFileDescriptor.connectionLost} implementation which raises an
exception does not prevent the remaining L{IFileDescriptor}s from
having their C{connectionLost} method called.
"""
lostOK = [False]
# Subclass FileDescriptor to get logPrefix
class ProblematicFileDescriptor(FileDescriptor):
def connectionLost(self, reason):
raise RuntimeError("simulated connectionLost error")
class OKFileDescriptor(FileDescriptor):
def connectionLost(self, reason):
lostOK[0] = True
reactor = self.buildReactor()
# Unfortunately, it is necessary to patch removeAll to directly control
# the order of the returned values. The test is only valid if
# ProblematicFileDescriptor comes first. Also, return these
# descriptors only the first time removeAll is called so that if it is
# called again the file descriptors aren't re-disconnected.
fds = iter([ProblematicFileDescriptor(), OKFileDescriptor()])
reactor.removeAll = lambda: fds
reactor.callWhenRunning(reactor.stop)
self.runReactor(reactor)
self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 1)
self.assertTrue(lostOK[0])
def test_multipleRun(self):
"""
C{reactor.run()} raises L{ReactorAlreadyRunning} when called when
the reactor is already running.
"""
events = []
def reentrantRun():
self.assertRaises(ReactorAlreadyRunning, reactor.run)
events.append("tested")
reactor = self.buildReactor()
reactor.callWhenRunning(reentrantRun)
reactor.callWhenRunning(reactor.stop)
reactor.run()
self.assertEqual(events, ["tested"])
def test_runWithAsynchronousBeforeStartupTrigger(self):
"""
When there is a C{'before'} C{'startup'} trigger which returns an
unfired L{Deferred}, C{reactor.run()} starts the reactor and does not
return until after C{reactor.stop()} is called
"""
events = []
def trigger():
events.append('trigger')
d = Deferred()
d.addCallback(callback)
reactor.callLater(0, d.callback, None)
return d
def callback(ignored):
events.append('callback')
reactor.stop()
reactor = self.buildReactor()
reactor.addSystemEventTrigger('before', 'startup', trigger)
reactor.run()
self.assertEqual(events, ['trigger', 'callback'])
def test_iterate(self):
"""
C{reactor.iterate()} does not block.
"""
reactor = self.buildReactor()
t = reactor.callLater(5, reactor.crash)
start = time.time()
reactor.iterate(0) # Shouldn't block
elapsed = time.time() - start
self.failUnless(elapsed < 2)
t.cancel()
def test_crash(self):
"""
C{reactor.crash()} stops the reactor and does not fire shutdown
triggers.
"""
reactor = self.buildReactor()
events = []
reactor.addSystemEventTrigger(
"before", "shutdown",
lambda: events.append(("before", "shutdown")))
reactor.callWhenRunning(reactor.callLater, 0, reactor.crash)
reactor.run()
self.assertFalse(reactor.running)
self.assertFalse(
events,
"Shutdown triggers invoked but they should not have been.")
def test_runAfterCrash(self):
"""
C{reactor.run()} restarts the reactor after it has been stopped by
C{reactor.crash()}.
"""
events = []
def crash():
events.append('crash')
reactor.crash()
reactor = self.buildReactor()
reactor.callWhenRunning(crash)
reactor.run()
def stop():
events.append(('stop', reactor.running))
reactor.stop()
reactor.callWhenRunning(stop)
reactor.run()
self.assertEqual(events, ['crash', ('stop', True)])
def test_runAfterStop(self):
"""
C{reactor.run()} raises L{ReactorNotRestartable} when called when
the reactor is being run after getting stopped priorly.
"""
events = []
def restart():
self.assertRaises(ReactorNotRestartable, reactor.run)
events.append('tested')
reactor = self.buildReactor()
reactor.callWhenRunning(reactor.stop)
reactor.addSystemEventTrigger('after', 'shutdown', restart)
reactor.run()
self.assertEqual(events, ['tested'])
globals().update(SystemEventTestsBuilder.makeTestCaseClasses())
globals().update(ObjectModelIntegrationTest.makeTestCaseClasses())