|
System | : | Linux MiraNet 3.0.0-14-generic-pae #23-Ubuntu SMP Mon Nov 21 22:07:10 UTC 2011 i686 |
Software | : | Apache. PHP/5.3.6-13ubuntu3.10 |
ID | : | uid=65534(nobody) gid=65534(nogroup) groups=65534(nogroup)
|
|
Safe Mode | : | OFF |
Open_Basedir | : | OFF |
Freespace | : | 21.68 GB of 70.42 GB (30.79%) |
|
MySQL: ON MSSQL: OFF Oracle: OFF PostgreSQL: OFF Curl: OFF Sockets: ON Fetch: OFF Wget: ON Perl: ON |
Disabled Functions: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,
|
[ System Info ]
[ Processes ]
[ SQL Manager ]
[ Eval ]
[ Encoder ]
[ Mailer ]
[ Back Connection ]
[ Backdoor Server ]
[ Kernel Exploit Search ]
[ MD5 Decrypter ]
[ Reverse IP ]
[ Kill Shell ]
[ FTP Brute-Force ]
|
|
/
usr/
lib/
python2.6/
dist-packages/
twisted/
protocols/
- drwxr-xr-x
|
Viewing file: pcp.py (6.92 KB) -rw-r--r--Select action/file-type:  ( +) |  ( +) |  ( +) | Code ( +) | Session ( +) |  ( +) | SDB ( +) |  ( +) |  ( +) |  ( +) |  ( +) |  ( +) |
# -*- test-case-name: twisted.test.test_pcp -*- # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details.
""" Producer-Consumer Proxy. """
from zope.interface import implements
from twisted.internet import interfaces
class BasicProducerConsumerProxy: """ I can act as a man in the middle between any Producer and Consumer.
@ivar producer: the Producer I subscribe to. @type producer: L{IProducer<interfaces.IProducer>} @ivar consumer: the Consumer I publish to. @type consumer: L{IConsumer<interfaces.IConsumer>} @ivar paused: As a Producer, am I paused? @type paused: bool """ implements(interfaces.IProducer, interfaces.IConsumer)
consumer = None producer = None producerIsStreaming = None iAmStreaming = True outstandingPull = False paused = False stopped = False
def __init__(self, consumer): self._buffer = [] if consumer is not None: self.consumer = consumer consumer.registerProducer(self, self.iAmStreaming)
# Producer methods:
def pauseProducing(self): self.paused = True if self.producer: self.producer.pauseProducing()
def resumeProducing(self): self.paused = False if self._buffer: # TODO: Check to see if consumer supports writeSeq. self.consumer.write(''.join(self._buffer)) self._buffer[:] = [] else: if not self.iAmStreaming: self.outstandingPull = True
if self.producer is not None: self.producer.resumeProducing()
def stopProducing(self): if self.producer is not None: self.producer.stopProducing() if self.consumer is not None: del self.consumer
# Consumer methods:
def write(self, data): if self.paused or (not self.iAmStreaming and not self.outstandingPull): # We could use that fifo queue here. self._buffer.append(data)
elif self.consumer is not None: self.consumer.write(data) self.outstandingPull = False
def finish(self): if self.consumer is not None: self.consumer.finish() self.unregisterProducer()
def registerProducer(self, producer, streaming): self.producer = producer self.producerIsStreaming = streaming
def unregisterProducer(self): if self.producer is not None: del self.producer del self.producerIsStreaming if self.consumer: self.consumer.unregisterProducer()
def __repr__(self): return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer)
class ProducerConsumerProxy(BasicProducerConsumerProxy): """ProducerConsumerProxy with a finite buffer.
When my buffer fills up, I have my parent Producer pause until my buffer has room in it again. """ # Copies much from abstract.FileDescriptor bufferSize = 2**2**2**2
producerPaused = False unregistered = False
def pauseProducing(self): # Does *not* call up to ProducerConsumerProxy to relay the pause # message through to my parent Producer. self.paused = True
def resumeProducing(self): self.paused = False if self._buffer: data = ''.join(self._buffer) bytesSent = self._writeSomeData(data) if bytesSent < len(data): unsent = data[bytesSent:] assert not self.iAmStreaming, ( "Streaming producer did not write all its data.") self._buffer[:] = [unsent] else: self._buffer[:] = [] else: bytesSent = 0
if (self.unregistered and bytesSent and not self._buffer and self.consumer is not None): self.consumer.unregisterProducer()
if not self.iAmStreaming: self.outstandingPull = not bytesSent
if self.producer is not None: bytesBuffered = sum([len(s) for s in self._buffer]) # TODO: You can see here the potential for high and low # watermarks, where bufferSize would be the high mark when we # ask the upstream producer to pause, and we wouldn't have # it resume again until it hit the low mark. Or if producer # is Pull, maybe we'd like to pull from it as much as necessary # to keep our buffer full to the low mark, so we're never caught # without something to send. if self.producerPaused and (bytesBuffered < self.bufferSize): # Now that our buffer is empty, self.producerPaused = False self.producer.resumeProducing() elif self.outstandingPull: # I did not have any data to write in response to a pull, # so I'd better pull some myself. self.producer.resumeProducing()
def write(self, data): if self.paused or (not self.iAmStreaming and not self.outstandingPull): # We could use that fifo queue here. self._buffer.append(data)
elif self.consumer is not None: assert not self._buffer, ( "Writing fresh data to consumer before my buffer is empty!") # I'm going to use _writeSomeData here so that there is only one # path to self.consumer.write. But it doesn't actually make sense, # if I am streaming, for some data to not be all data. But maybe I # am not streaming, but I am writing here anyway, because there was # an earlier request for data which was not answered. bytesSent = self._writeSomeData(data) self.outstandingPull = False if not bytesSent == len(data): assert not self.iAmStreaming, ( "Streaming producer did not write all its data.") self._buffer.append(data[bytesSent:])
if (self.producer is not None) and self.producerIsStreaming: bytesBuffered = sum([len(s) for s in self._buffer]) if bytesBuffered >= self.bufferSize:
self.producer.pauseProducing() self.producerPaused = True
def registerProducer(self, producer, streaming): self.unregistered = False BasicProducerConsumerProxy.registerProducer(self, producer, streaming) if not streaming: producer.resumeProducing()
def unregisterProducer(self): if self.producer is not None: del self.producer del self.producerIsStreaming self.unregistered = True if self.consumer and not self._buffer: self.consumer.unregisterProducer()
def _writeSomeData(self, data): """Write as much of this data as possible.
@returns: The number of bytes written. """ if self.consumer is None: return 0 self.consumer.write(data) return len(data)
|