Viewing file:
test_stdio.py (13.77 KB) -rw-r--r--Select action/file-type:

(
+) |

(
+) |

(
+) |
Code (
+) |
Session (
+) |

(
+) |
SDB (
+) |

(
+) |

(
+) |

(
+) |

(
+) |

(
+) |
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.internet.stdio}.
"""
import os, sys, itertools
from twisted.trial import unittest
from twisted.python import filepath, log
from twisted.python.runtime import platform
from twisted.internet import error, defer, protocol, stdio, reactor
from twisted.test.test_tcp import ConnectionLostNotifyingProtocol
# A short string which is intended to appear here and nowhere else,
# particularly not in any random garbage output CPython unavoidable
# generates (such as in warning text and so forth). This is searched
# for in the output from stdio_test_lastwrite.py and if it is found at
# the end, the functionality works.
UNIQUE_LAST_WRITE_STRING = 'xyz123abc Twisted is great!'
skipWindowsNopywin32 = None
if platform.isWindows():
try:
import win32process
except ImportError:
skipWindowsNopywin32 = ("On windows, spawnProcess is not available "
"in the absence of win32process.")
class StandardIOTestProcessProtocol(protocol.ProcessProtocol):
"""
Test helper for collecting output from a child process and notifying
something when it exits.
@ivar onConnection: A L{defer.Deferred} which will be called back with
C{None} when the connection to the child process is established.
@ivar onCompletion: A L{defer.Deferred} which will be errbacked with the
failure associated with the child process exiting when it exits.
@ivar onDataReceived: A L{defer.Deferred} which will be called back with
this instance whenever C{childDataReceived} is called, or C{None} to
suppress these callbacks.
@ivar data: A C{dict} mapping file descriptors to strings containing all
bytes received from the child process on each file descriptor.
"""
onDataReceived = None
def __init__(self):
self.onConnection = defer.Deferred()
self.onCompletion = defer.Deferred()
self.data = {}
def connectionMade(self):
self.onConnection.callback(None)
def childDataReceived(self, name, bytes):
"""
Record all bytes received from the child process in the C{data}
dictionary. Fire C{onDataReceived} if it is not C{None}.
"""
self.data[name] = self.data.get(name, '') + bytes
if self.onDataReceived is not None:
d, self.onDataReceived = self.onDataReceived, None
d.callback(self)
def processEnded(self, reason):
self.onCompletion.callback(reason)
class StandardInputOutputTestCase(unittest.TestCase):
skip = skipWindowsNopywin32
def _spawnProcess(self, proto, sibling, *args, **kw):
"""
Launch a child Python process and communicate with it using the
given ProcessProtocol.
@param proto: A L{ProcessProtocol} instance which will be connected
to the child process.
@param sibling: The basename of a file containing the Python program
to run in the child process.
@param *args: strings which will be passed to the child process on
the command line as C{argv[2:]}.
@param **kw: additional arguments to pass to L{reactor.spawnProcess}.
@return: The L{IProcessTransport} provider for the spawned process.
"""
import twisted
subenv = dict(os.environ)
subenv['PYTHONPATH'] = os.pathsep.join(
[os.path.abspath(
os.path.dirname(os.path.dirname(twisted.__file__))),
subenv.get('PYTHONPATH', '')
])
args = [sys.executable,
filepath.FilePath(__file__).sibling(sibling).path,
reactor.__class__.__module__] + list(args)
return reactor.spawnProcess(
proto,
sys.executable,
args,
env=subenv,
**kw)
def _requireFailure(self, d, callback):
def cb(result):
self.fail("Process terminated with non-Failure: %r" % (result,))
def eb(err):
return callback(err)
return d.addCallbacks(cb, eb)
def test_loseConnection(self):
"""
Verify that a protocol connected to L{StandardIO} can disconnect
itself using C{transport.loseConnection}.
"""
errorLogFile = self.mktemp()
log.msg("Child process logging to " + errorLogFile)
p = StandardIOTestProcessProtocol()
d = p.onCompletion
self._spawnProcess(p, 'stdio_test_loseconn.py', errorLogFile)
def processEnded(reason):
# Copy the child's log to ours so it's more visible.
for line in file(errorLogFile):
log.msg("Child logged: " + line.rstrip())
self.failIfIn(1, p.data)
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def test_readConnectionLost(self):
"""
When stdin is closed and the protocol connected to it implements
L{IHalfCloseableProtocol}, the protocol's C{readConnectionLost} method
is called.
"""
errorLogFile = self.mktemp()
log.msg("Child process logging to " + errorLogFile)
p = StandardIOTestProcessProtocol()
p.onDataReceived = defer.Deferred()
def cbBytes(ignored):
d = p.onCompletion
p.transport.closeStdin()
return d
p.onDataReceived.addCallback(cbBytes)
def processEnded(reason):
reason.trap(error.ProcessDone)
d = self._requireFailure(p.onDataReceived, processEnded)
self._spawnProcess(
p, 'stdio_test_halfclose.py', errorLogFile)
return d
def test_lastWriteReceived(self):
"""
Verify that a write made directly to stdout using L{os.write}
after StandardIO has finished is reliably received by the
process reading that stdout.
"""
p = StandardIOTestProcessProtocol()
# Note: the OS X bug which prompted the addition of this test
# is an apparent race condition involving non-blocking PTYs.
# Delaying the parent process significantly increases the
# likelihood of the race going the wrong way. If you need to
# fiddle with this code at all, uncommenting the next line
# will likely make your life much easier. It is commented out
# because it makes the test quite slow.
# p.onConnection.addCallback(lambda ign: __import__('time').sleep(5))
try:
self._spawnProcess(
p, 'stdio_test_lastwrite.py', UNIQUE_LAST_WRITE_STRING,
usePTY=True)
except ValueError, e:
# Some platforms don't work with usePTY=True
raise unittest.SkipTest(str(e))
def processEnded(reason):
"""
Asserts that the parent received the bytes written by the child
immediately after the child starts.
"""
self.assertTrue(
p.data[1].endswith(UNIQUE_LAST_WRITE_STRING),
"Received %r from child, did not find expected bytes." % (
p.data,))
reason.trap(error.ProcessDone)
return self._requireFailure(p.onCompletion, processEnded)
def test_hostAndPeer(self):
"""
Verify that the transport of a protocol connected to L{StandardIO}
has C{getHost} and C{getPeer} methods.
"""
p = StandardIOTestProcessProtocol()
d = p.onCompletion
self._spawnProcess(p, 'stdio_test_hostpeer.py')
def processEnded(reason):
host, peer = p.data[1].splitlines()
self.failUnless(host)
self.failUnless(peer)
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def test_write(self):
"""
Verify that the C{write} method of the transport of a protocol
connected to L{StandardIO} sends bytes to standard out.
"""
p = StandardIOTestProcessProtocol()
d = p.onCompletion
self._spawnProcess(p, 'stdio_test_write.py')
def processEnded(reason):
self.assertEquals(p.data[1], 'ok!')
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def test_writeSequence(self):
"""
Verify that the C{writeSequence} method of the transport of a
protocol connected to L{StandardIO} sends bytes to standard out.
"""
p = StandardIOTestProcessProtocol()
d = p.onCompletion
self._spawnProcess(p, 'stdio_test_writeseq.py')
def processEnded(reason):
self.assertEquals(p.data[1], 'ok!')
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def _junkPath(self):
junkPath = self.mktemp()
junkFile = file(junkPath, 'w')
for i in xrange(1024):
junkFile.write(str(i) + '\n')
junkFile.close()
return junkPath
def test_producer(self):
"""
Verify that the transport of a protocol connected to L{StandardIO}
is a working L{IProducer} provider.
"""
p = StandardIOTestProcessProtocol()
d = p.onCompletion
written = []
toWrite = range(100)
def connectionMade(ign):
if toWrite:
written.append(str(toWrite.pop()) + "\n")
proc.write(written[-1])
reactor.callLater(0.01, connectionMade, None)
proc = self._spawnProcess(p, 'stdio_test_producer.py')
p.onConnection.addCallback(connectionMade)
def processEnded(reason):
self.assertEquals(p.data[1], ''.join(written))
self.failIf(toWrite, "Connection lost with %d writes left to go." % (len(toWrite),))
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def test_consumer(self):
"""
Verify that the transport of a protocol connected to L{StandardIO}
is a working L{IConsumer} provider.
"""
p = StandardIOTestProcessProtocol()
d = p.onCompletion
junkPath = self._junkPath()
self._spawnProcess(p, 'stdio_test_consumer.py', junkPath)
def processEnded(reason):
self.assertEquals(p.data[1], file(junkPath).read())
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def test_normalFileStandardOut(self):
"""
If L{StandardIO} is created with a file descriptor which refers to a
normal file (ie, a file from the filesystem), L{StandardIO.write}
writes bytes to that file. In particular, it does not immediately
consider the file closed or call its protocol's C{connectionLost}
method.
"""
onConnLost = defer.Deferred()
proto = ConnectionLostNotifyingProtocol(onConnLost)
path = filepath.FilePath(self.mktemp())
self.normal = normal = path.open('w')
self.addCleanup(normal.close)
kwargs = dict(stdout=normal.fileno())
if not platform.isWindows():
# Make a fake stdin so that StandardIO doesn't mess with the *real*
# stdin.
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
kwargs['stdin'] = r
connection = stdio.StandardIO(proto, **kwargs)
# The reactor needs to spin a bit before it might have incorrectly
# decided stdout is closed. Use this counter to keep track of how
# much we've let it spin. If it closes before we expected, this
# counter will have a value that's too small and we'll know.
howMany = 5
count = itertools.count()
def spin():
for value in count:
if value == howMany:
connection.loseConnection()
return
connection.write(str(value))
break
reactor.callLater(0, spin)
reactor.callLater(0, spin)
# Once the connection is lost, make sure the counter is at the
# appropriate value.
def cbLost(reason):
self.assertEquals(count.next(), howMany + 1)
self.assertEquals(
path.getContent(),
''.join(map(str, range(howMany))))
onConnLost.addCallback(cbLost)
return onConnLost
if reactor.__class__.__name__ == 'EPollReactor':
test_normalFileStandardOut.skip = (
"epoll(7) does not support normal files. See #4429. "
"This should be a todo but technical limitations prevent "
"this.")
elif platform.isWindows():
test_normalFileStandardOut.skip = (
"StandardIO does not accept stdout as an argument to Windows. "
"Testing redirection to a file is therefore harder.")
def test_normalFileStandardOutGoodEpollError(self):
"""
Using StandardIO with epollreactor with stdout redirected to a
normal file fails with a comprehensible error (until it is
supported, when #4429 is resolved). See also #2259 and #3442.
"""
path = filepath.FilePath(self.mktemp())
normal = path.open('w')
fd = normal.fileno()
self.addCleanup(normal.close)
exc = self.assertRaises(
RuntimeError,
stdio.StandardIO, protocol.Protocol(), stdout=fd)
self.assertEquals(
str(exc),
"This reactor does not support this type of file descriptor (fd "
"%d, mode %d) (for example, epollreactor does not support normal "
"files. See #4429)." % (fd, os.fstat(fd).st_mode))
if reactor.__class__.__name__ != 'EPollReactor':
test_normalFileStandardOutGoodEpollError.skip = (
"Only epollreactor is expected to fail with stdout redirected "
"to a normal file.")