ShellBanner
System:Linux MiraNet 3.0.0-14-generic-pae #23-Ubuntu SMP Mon Nov 21 22:07:10 UTC 2011 i686
Software:Apache. PHP/5.3.6-13ubuntu3.10
ID:uid=65534(nobody) gid=65534(nogroup) groups=65534(nogroup)
Safe Mode:OFF
Open_Basedir:OFF
Freespace:20.29 GB of 70.42 GB (28.81%)
MySQL: ON MSSQL: OFF Oracle: OFF PostgreSQL: OFF Curl: OFF Sockets: ON Fetch: OFF Wget: ON Perl: ON
Disabled Functions: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,

/ usr/ lib/ python2.7/ dist-packages/ twisted/ internet/ test/ - drwxr-xr-x

Directory:
Viewing file:     test_unix.py (5.36 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for implementations of L{IReactorUNIX}.
"""

from stat import S_IMODE
from os import stat
from sys import platform

try:
    from socket import AF_UNIX
except ImportError:
    AF_UNIX = None

from zope.interface.verify import verifyObject

from twisted.python.hashlib import md5
from twisted.internet.interfaces import IConnector
from twisted.internet.address import UNIXAddress
from twisted.internet import interfaces
from twisted.internet.protocol import (
    ServerFactory, ClientFactory, DatagramProtocol)
from twisted.internet.test.reactormixins import ReactorBuilder
from twisted.internet.test.test_tcp import TCPPortTestsBuilder



class UNIXFamilyMixin:
    """
    Test-helper defining mixin for things related to AF_UNIX sockets.
    """
    if AF_UNIX is None:
        skip = "Platform does not support AF_UNIX sockets"

    def _modeTest(self, methodName, path, factory):
        """
        Assert that the mode of the created unix socket is set to the mode
        specified to the reactor method.
        """
        mode = 0600
        reactor = self.buildReactor()
        unixPort = getattr(reactor, methodName)(path, factory, mode=mode)
        unixPort.stopListening()
        self.assertEqual(S_IMODE(stat(path).st_mode), mode)


def _abstractPath(case):
    """
    Return a new, unique abstract namespace path to be listened on.
    """
    # Use the test cases's mktemp to get something unique, but also squash it
    # down to make sure it fits in the unix socket path limit (something around
    # 110 bytes).
    return md5(case.mktemp()).hexdigest()


class UNIXTestsBuilder(UNIXFamilyMixin, ReactorBuilder):
    """
    Builder defining tests relating to L{IReactorUNIX}.
    """
    def test_interface(self):
        """
        L{IReactorUNIX.connectUNIX} returns an object providing L{IConnector}.
        """
        reactor = self.buildReactor()
        connector = reactor.connectUNIX(self.mktemp(), ClientFactory())
        self.assertTrue(verifyObject(IConnector, connector))


    def test_mode(self):
        """
        The UNIX socket created by L{IReactorUNIX.listenUNIX} is created with
        the mode specified.
        """
        self._modeTest('listenUNIX', self.mktemp(), ServerFactory())


    def test_listenOnLinuxAbstractNamespace(self):
        """
        On Linux, a UNIX socket path may begin with C{'\0'} to indicate a socket
        in the abstract namespace.  L{IReactorUNIX.listenUNIX} accepts such a
        path.
        """
        # Don't listen on a path longer than the maximum allowed.
        path = _abstractPath(self)
        reactor = self.buildReactor()
        port = reactor.listenUNIX('\0' + path, ServerFactory())
        self.assertEquals(port.getHost(), UNIXAddress('\0' + path))
    if platform != 'linux2':
        test_listenOnLinuxAbstractNamespace.skip = (
            'Abstract namespace UNIX sockets only supported on Linux.')


    def test_connectToLinuxAbstractNamespace(self):
        """
        L{IReactorUNIX.connectUNIX} also accepts a Linux abstract namespace
        path.
        """
        path = _abstractPath(self)
        reactor = self.buildReactor()
        connector = reactor.connectUNIX('\0' + path, ClientFactory())
        self.assertEquals(
            connector.getDestination(), UNIXAddress('\0' + path))
    if platform != 'linux2':
        test_connectToLinuxAbstractNamespace.skip = (
            'Abstract namespace UNIX sockets only supported on Linux.')



class UNIXDatagramTestsBuilder(UNIXFamilyMixin, ReactorBuilder):
    """
    Builder defining tests relating to L{IReactorUNIXDatagram}.
    """
    # There's no corresponding test_connectMode because the mode parameter to
    # connectUNIXDatagram has been completely ignored since that API was first
    # introduced.
    def test_listenMode(self):
        """
        The UNIX socket created by L{IReactorUNIXDatagram.listenUNIXDatagram}
        is created with the mode specified.
        """
        self._modeTest('listenUNIXDatagram', self.mktemp(), DatagramProtocol())


    def test_listenOnLinuxAbstractNamespace(self):
        """
        On Linux, a UNIX socket path may begin with C{'\0'} to indicate a socket
        in the abstract namespace.  L{IReactorUNIX.listenUNIXDatagram} accepts
        such a path.
        """
        path = _abstractPath(self)
        reactor = self.buildReactor()
        port = reactor.listenUNIXDatagram('\0' + path, DatagramProtocol())
        self.assertEquals(port.getHost(), UNIXAddress('\0' + path))
    if platform != 'linux2':
        test_listenOnLinuxAbstractNamespace.skip = (
            'Abstract namespace UNIX sockets only supported on Linux.')



class UNIXPortTestsBuilder(TCPPortTestsBuilder):
    """
    Tests for L{IReactorUNIX.listenUnix}
    """

    requiredInterfaces = [interfaces.IReactorUNIX]

    def getListeningPort(self, reactor):
        """
        Get a UNIX port from a reactor
        """
        return reactor.listenUNIX(self.mktemp(), ServerFactory())


    def getExpectedConnectionLostLogMsg(self, port):
        """
        Get the expected connection lost message for a UNIX port
        """
        return "(UNIX Port %s Closed)" % (repr(port.port),)



globals().update(UNIXTestsBuilder.makeTestCaseClasses())
globals().update(UNIXDatagramTestsBuilder.makeTestCaseClasses())
globals().update(UNIXPortTestsBuilder.makeTestCaseClasses())
Command:
Quick Commands:
Upload:
[Read-Only] Max size: 100MB
PHP Filesystem: <@ Ú
Search File:
regexp
Create File:
Overwrite [Read-Only]
View File:
Mass Defacement:
[+] Main Directory: [+] Defacement Url:
LmfaoX Shell - Private Build [BETA] - v0.1 -; Generated: 0.5521 seconds