ó
[³XMc           @   sâ   d  Z  d d l m Z d d l m Z d d l m Z d d l m Z d d l	 m
 Z
 d d l m Z m Z d d l m Z m Z d	 e f d
 „  ƒ  YZ e e e ƒ d e f d „  ƒ  YZ e e e ƒ d e f d „  ƒ  YZ d S(   s:   
Tests for (new code in) L{twisted.application.internet}.
iÿÿÿÿ(   t
   implements(   t   verifyClass(   t   Factory(   t   TestCase(   t   StreamServerEndpointService(   t   IStreamServerEndpointt   IListeningPort(   t   Deferredt   CancelledErrort
   FakeServerc           B   s]   e  Z d  Z e e ƒ d Z d Z d Z e	 ƒ  Z
 d Z d „  Z d „  Z d „  Z d „  Z RS(   sq  
    In-memory implementation of L{IStreamServerEndpoint}.

    @ivar result: The L{Deferred} resulting from the call to C{listen}, after
        C{listen} has been called.

    @ivar factory: The factory passed to C{listen}.

    @ivar cancelException: The exception to errback C{self.result} when it is
        cancelled.

    @ivar port: The L{IListeningPort} which C{listen}'s L{Deferred} will fire
        with.

    @ivar listenAttempts: The number of times C{listen} has been invoked.

    @ivar failImmediately: If set, the exception to fail the L{Deferred}
        returned from C{listen} before it is returned.
    i    c         C   s   t  ƒ  |  _ d  S(   N(   t   FakePortt   port(   t   self(    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   __init__/   s    c            s_   ˆ  j  d 7_  | ˆ  _ t d ‡  f d †  ƒ ˆ  _ ˆ  j d k	 rX ˆ  j j ˆ  j ƒ n  ˆ  j S(   sw   
        Return a Deferred and store it for future use.  (Implementation of
        L{IStreamServerEndpoint}).
        i   t	   cancellerc            s   |  j  ˆ  j ƒ S(   N(   t   errbackt   cancelException(   t   d(   R   (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   <lambda>;   s    N(   t   listenAttemptst   factoryR   t   resultt   failImmediatelyt   NoneR   (   R   R   (    (   R   sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   listen3   s    	c         C   s   |  j  j |  j ƒ d S(   s°   
        Test code should invoke this method after causing C{listen} to be
        invoked in order to fire the L{Deferred} previously returned from
        C{listen}.
        N(   R   t   callbackR   (   R   (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   startedListeningA   s    c         C   s   |  j  j j d ƒ d S(   s  
        Test code should invoke this method after causing C{stopListening} to
        be invoked on the port fired from the L{Deferred} returned from
        C{listen} in order to cause the L{Deferred} returned from
        C{stopListening} to fire.
        N(   R   t   deferredR   R   (   R   (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   stoppedListeningJ   s    N(   t   __name__t
   __module__t   __doc__R    R   R   R   R   R   R   R   R   R   R   R   R   (    (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyR	      s   
					R
   c           B   s'   e  Z d  Z e e ƒ d Z d „  Z RS(   ss   
    Fake L{IListeningPort} implementation.

    @ivar deferred: The L{Deferred} returned by C{stopListening}.
    c         C   s   t  ƒ  |  _ |  j S(   N(   R   R   (   R   (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   stopListeningb   s    N(   R   R   R   R    R   R   R   R    (    (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyR
   W   s   
t   TestEndpointServicec           B   st   e  Z d  Z d „  Z d „  Z d d „ Z d „  Z d „  Z d „  Z	 d „  Z
 d „  Z d	 „  Z d
 „  Z d „  Z RS(   s4   
    Tests for L{twisted.application.internet}.
    c         C   s4   t  ƒ  |  _ t ƒ  |  _ t |  j |  j ƒ |  _ d S(   sp   
        Construct a stub server, a stub factory, and a
        L{StreamServerEndpointService} to test.
        N(   R	   t
   fakeServerR   R   R   t   svc(   R   (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   setUpo   s    c         C   s*   |  j  j ƒ  |  j |  j |  j j ƒ d S(   s‡   
        L{StreamServerEndpointService.privilegedStartService} calls its
        endpoint's C{listen} method with its factory.
        N(   R#   t   privilegedStartServicet   assertIdenticalR   R"   (   R   (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   test_privilegedStartServicey   s    c         C   s;   t  ƒ  |  j _ t |  j _ |  j t  | p3 |  j j ƒ d S(   sÜ  
        L{StreamServerEndpointService.startService} should raise synchronously
        if the L{Deferred} returned by its wrapped
        L{IStreamServerEndpoint.listen} has already fired with an errback and
        the L{StreamServerEndpointService}'s C{_raiseSynchronously} flag has
        been set.  This feature is necessary to preserve compatibility with old
        behavior of L{twisted.internet.strports.service}, which is to return a
        service which synchronously raises an exception from C{startService}
        (so that, among other things, twistd will not start running).  However,
        since L{IStreamServerEndpoint.listen} may fail asynchronously, it is
        a bad idea to rely on this behavior.
        N(   t   ZeroDivisionErrorR"   R   t   TrueR#   t   _raiseSynchronouslyt   assertRaisest   startService(   R   t   thunk(    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt(   test_synchronousRaiseRaisesSynchronously‚   s    c         C   s   |  j  |  j j ƒ d S(   sÒ   
        L{StreamServerEndpointService.privilegedStartService} should behave the
        same as C{startService} with respect to
        L{TestEndpointService.test_synchronousRaiseRaisesSynchronously}.
        N(   R.   R#   R%   (   R   (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   test_synchronousRaisePrivileged”   s    c         C   sL   |  j  j ƒ  |  j j j t ƒ  ƒ |  j t ƒ } |  j t | ƒ d ƒ d S(   s  
        L{StreamServerEndpointService.startService} and
        L{StreamServerEndpointService.privilegedStartService} should both log
        an exception when the L{Deferred} returned from their wrapped
        L{IStreamServerEndpoint.listen} fails.
        i   N(	   R#   R,   R"   R   R   R(   t   flushLoggedErrorst   assertEqualst   len(   R   t   logged(    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   test_failReportsErrorž   s    c         C   sE   t  ƒ  |  j _ |  j j ƒ  |  j t  ƒ } |  j t | ƒ d ƒ d S(   sŸ   
        Without the C{_raiseSynchronously} compatibility flag, failing
        immediately has the same behavior as failing later; it logs the error.
        i   N(   R(   R"   R   R#   R,   R0   R1   R2   (   R   R3   (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt    test_synchronousFailReportsError«   s    c         C   s@   |  j  j ƒ  |  j |  j |  j j ƒ |  j |  j  j t ƒ d S(   sÃ   
        L{StreamServerEndpointService.startService} sets the C{running} flag,
        and calls its endpoint's C{listen} method with its factory, if it
        has not yet been started.
        N(   R#   R,   R&   R   R"   R1   t   runningR)   (   R   (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   test_startServiceUnstarted¶   s    c         C   sG   |  j  ƒ  |  j j ƒ  |  j |  j j d ƒ |  j |  j j t ƒ d S(   sš   
        L{StreamServerEndpointService.startService} sets the C{running} flag,
        but nothing else, if the service has already been started.
        i   N(   R'   R#   R,   R1   R"   R   R6   R)   (   R   (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   test_startServiceStartedÁ   s    
c         C   sœ   |  j  j ƒ  |  j j ƒ  |  j  j ƒ  |  j  j ƒ  } g  } | j | j ƒ |  j t	 | ƒ d ƒ |  j j
 ƒ  |  j t	 | ƒ d ƒ |  j |  j  j ƒ d S(   sã   
        L{StreamServerEndpointService.stopService} calls C{stopListening} on
        the L{IListeningPort} returned from its endpoint, returns the
        C{Deferred} from stopService, and sets C{running} to C{False}.
        i    i   N(   R#   R%   R"   R   R,   t   stopServicet   addCallbackt   appendR1   R2   R   t   assertFalseR6   (   R   R   t   l(    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   test_stopServiceÌ   s    c         C   sb   |  j  j ƒ  |  j  j ƒ  } g  } | j | j ƒ |  j | d g ƒ |  j |  j t ƒ g  ƒ d S(   sÚ   
        L{StreamServerEndpointService.stopService} cancels the L{Deferred}
        returned by C{listen} if it has not yet fired.  No error will be logged
        about the cancellation of the listen attempt.
        N(	   R#   R%   R9   t   addBothR;   R1   R   R0   R   (   R   R   R=   (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt#   test_stopServiceBeforeStartFinishedß   s    c         C   s}   t  ƒ  |  j _ |  j j ƒ  |  j j ƒ  } g  } | j | j ƒ |  j | d g ƒ |  j
 t  ƒ } |  j t | ƒ d ƒ d S(   sá   
        L{StreamServerEndpointService.stopService} cancels the L{Deferred}
        returned by C{listen} if it has not fired yet.  An error will be logged
        if the resulting exception is not L{CancelledError}.
        i   N(   R(   R"   R   R#   R%   R9   R:   R;   R1   R   R0   R2   (   R   R   R=   t   stoppingErrors(    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt    test_stopServiceCancelStartErrorí   s    N(   R   R   R   R$   R'   R   R.   R/   R4   R5   R7   R8   R>   R@   RB   (    (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyR!   j   s   	
			
						N(   R   t   zope.interfaceR    t   zope.interface.verifyR   t   twisted.internet.protocolR   t   twisted.trial.unittestR   t   twisted.application.internetR   t   twisted.internet.interfacesR   R   t   twisted.internet.deferR   R   t   objectR	   R
   R!   (    (    (    sJ   /usr/lib/python2.7/dist-packages/twisted/application/test/test_internet.pyt   <module>   s   A