ó
½-'Nc           @   s…   d  Z  d d l Z d d l Z d d l Z d d l Z d d l m Z d d l m Z d Z	 d Z
 d e f d „  ƒ  YZ d	 „  Z d S(
   s   Message storage.iÿÿÿÿN(   t   bpickle(   t
   SERVER_APIt   ht   bt   MessageStorec           B   s@  e  Z d  Z e Z d d# e j d „ Z d „  Z d „  Z d „  Z	 d „  Z
 d „  Z d	 „  Z d
 „  Z d „  Z d „  Z d „  Z d „  Z d „  Z d „  Z d „  Z d$ d „ Z d „  Z d „  Z d „  Z d „  Z d „  Z d „  Z d „  Z d$ d „ Z d d „ Z d „  Z  d „  Z! d „  Z" d  „  Z# d! „  Z$ d" „  Z% RS(%   sÍ  A message store which stores its messages in a file system hierarchy.

    The sequencing system we use in the message store may be quite
    confusing if you haven't looked at it in the last 10 minutes.  For
    that reason, let's review the terminology here.

    Assume we have 10 messages in the store, which we label by
    the following uppercase letters::

        A, B, C, D, E, F, G, H, I, J
                 ^

    Let's say that the next message we should send to the server is D.
    What we call "pending offset" is the displacement from the first
    message, which in our example above would be 3.  What we call
    "sequence" is the number that the server expects us to label message
    D as.  It could be pretty much any natural number, depending on the
    history of our exchanges with the server.  What we call "server
    sequence", is the next message number expected by the *client* itself,
    and is entirely unrelated to the stored messages.
    iè  i<   c         C   sq   | |  _  | |  _ | |  _ i  |  _ | |  _ | j d ƒ |  _ |  j ƒ  } t j	 j
 | ƒ sm t j | ƒ n  d S(   sÐ   
        @param persist: a L{Persist} used to save state parameters like
            the accepted message types, sequence, server uuid etc.
        @param directory: base of the file system hierarchy
        s   message-storeN(   t	   _get_timet
   _directoryt   _directory_sizet   _schemast   _original_persistt   root_att   _persistt   _message_dirt   ost   patht   isdirt   makedirs(   t   selft   persistt	   directoryt   directory_sizet   monitor_intervalt   get_timet   message_dir(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   __init__)   s    					c         C   s   |  j  j ƒ  d S(   s   Persist metadata to disk.N(   R	   t   save(   R   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   commit:   s    c         C   sN   t  | ƒ t t t f k s! t ‚ |  j j d t t | ƒ ƒ ƒ |  j ƒ  d S(   sæ   Specify the types of messages that the server will expect from us.

        If messages are added to the store which are not currently
        accepted, they will be saved but ignored until their type is
        accepted.
        s   accepted-typesN(   t   typet   tuplet   listt   sett   AssertionErrorR   t   sortedt   _reprocess_holding(   R   t   types(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   set_accepted_types>   s    !c         C   s   |  j  j d d ƒ S(   s)   Get a list of all accepted message types.s   accepted-types(    (   R   t   get(   R   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   get_accepted_typesI   s    c         C   s   | |  j  ƒ  k S(   s>   Return bool indicating if C{type} is an accepted message type.(   R%   (   R   R   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   acceptsM   s    c         C   s   |  j  j d d ƒ S(   s›   Get the current sequence.

        @return: The sequence number of the message that the server expects us to
           send on the next exchange.
        t   sequencei    (   R   R$   (   R   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   get_sequenceQ   s    c         C   s   |  j  j d | ƒ d S(   s“   Set the current sequence.

        Set the sequence number of the message that the server expects us to
        send on the next exchange.
        R'   N(   R   R   (   R   t   number(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   set_sequenceY   s    c         C   s   |  j  j d d ƒ S(   sª   Get the current server sequence.

        @return: the sequence number of the message that we will ask the server to
            send to us on the next exchange.
        t   server_sequencei    (   R   R$   (   R   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   get_server_sequencea   s    c         C   s   |  j  j d | ƒ d S(   s¡   Set the current server sequence.

        Set the sequence number of the message that we will ask the server to
        send to us on the next exchange.
        R+   N(   R   R   (   R   R)   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   set_server_sequencei   s    c         C   s   |  j  j d ƒ S(   s%   Return the currently set server UUID.t   server_uuid(   R   R$   (   R   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   get_server_uuidq   s    c         C   s   |  j  j d | ƒ d S(   s=   Change the known UUID from the server we're communicating to.R.   N(   R   R   (   R   t   uuid(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   set_server_uuidu   s    c         C   s   |  j  j d d ƒ S(   s   Get the current pending offset.t   pending_offseti    (   R   R$   (   R   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   get_pending_offsety   s    c         C   s   |  j  j d | ƒ d S(   s²   Set the current pending offset.

        Set the offset into the message pool to consider assigned to the
        current sequence number as returned by l{get_sequence}.
        R2   N(   R   R   (   R   t   val(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   set_pending_offset}   s    c         C   s   |  j  |  j ƒ  | ƒ d S(   s/   Increment the current pending offset by C{val}.N(   R5   R3   (   R   R4   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   add_pending_offset…   s    c         C   s   t  d „  |  j ƒ  Dƒ ƒ S(   s&   Return the number of pending messages.c         s   s   |  ] } d  Vq d S(   i   N(    (   t   .0t   x(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pys	   <genexpr>‹   s    (   t   sumt   _walk_pending_messages(   R   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   count_pending_messages‰   s    c         C   sÝ   |  j  ƒ  } g  } xÄ |  j ƒ  D]¶ } | d k	 rG t | ƒ | k rG Pn  |  j |  j | ƒ ƒ } y t j | ƒ } Wn0 t k
 r¤ } t	 j
 | ƒ |  j | t ƒ q X| d | k rÈ |  j | t ƒ q | j | ƒ q W| S(   s;   Get any pending messages that aren't being held, up to max.R   N(   R%   R:   t   Nonet   lent   _get_contentR   R    t   loadst
   ValueErrort   loggingt	   exceptiont
   _add_flagst   BROKENt   HELDt   append(   R   t   maxt   accepted_typest   messagest   filenamet   datat   messaget   e(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   get_pending_messages   s    c         C   sy   xr t  j |  j d t t ƒ |  j ƒ  ƒ D]H } t j | ƒ t j j	 | ƒ d } t j
 | ƒ s) t j | ƒ q) q) Wd S(   s>   Delete messages which are unlikely to be needed in the future.t   excludei    N(   t	   itertoolst   islicet   _walk_messagesRE   RD   R3   R   t   unlinkR   t   splitt   listdirt   rmdir(   R   t   fnt   containing_dir(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   delete_old_messages¡   s    c         C   s5   |  j  d ƒ x! |  j ƒ  D] } t j | ƒ q Wd S(   s   Remove ALL stored messages.i    N(   R5   RR   R   RS   (   R   RJ   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   delete_all_messagesª   s    c         C   s   | |  j  | j <d S(   sŠ   Add a schema to be applied to messages of the given type.

        The schema must be an instance of L{landscape.schema.Message}.
        N(   R   R   (   R   t   schema(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt
   add_schema°   s    c         C   s›   d } |  j  ƒ  } x‚ |  j d t ƒ D]n } |  j | ƒ } t | k sR | | k rn t j | ƒ j | k rn t St | k r% t | k r% | d 7} q% q% Wt	 S(   s•   Return bool indicating if C{message_id} still hasn't been delivered.

        @param message_id: Identifier returned by the L{add()} method.
        i    RO   i   (
   R3   RR   RD   t
   _get_flagsRE   R   t   statt   st_inot   Truet   False(   R   t
   message_idt   iR2   RJ   t   flags(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt
   is_pending·   s    c         C   sß   d | k s t  ‚ |  j | d j | ƒ } d | k rH |  j | d <n  t j | ƒ } |  j ƒ  } t | d d ƒ } | j | ƒ | j	 ƒ  t
 j | d | ƒ |  j | d ƒ sÉ |  j | t ƒ } n  t
 j | ƒ j } | S(   s  Queue a message for delivery.

        @param message: a C{dict} with a C{type} key and other keys conforming
            to the L{Message} schema for that specifc message type.
        @return: message_id, which is an identifier for the added message.
        R   t   apis   .tmpt   w(   R   R   t   coerceRf   R    t   dumpst   _get_next_message_filenamet   opent   writet   closeR   t   renameR&   t
   _set_flagsRE   R^   R_   (   R   RL   t   message_dataRJ   t   fileRb   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   addÇ   s    
c         C   sû   |  j  ƒ  } | r | d } n t j |  j d ƒ ƒ d } |  j  | ƒ } | se |  j | d ƒ } n’ t | ƒ |  j k  r¶ t t | d j d ƒ d ƒ d ƒ } |  j | | ƒ } nA |  j t t | ƒ d ƒ ƒ } t j | ƒ t j	 j
 | d ƒ } | S(   Niÿÿÿÿt   0t   _i    i   (   t   _get_sorted_filenamesR   R   R   R=   R   t   strt   intRT   R   t   join(   R   t   message_dirst
   newest_dirt   message_filenamesRJ   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyRj   ê   s    'c         c   sQ   |  j  ƒ  } x> t |  j d t t ƒ ƒ D]  \ } } | | k r) | Vq) q) Wd S(   s,   Walk the files which are definitely pending.RO   N(   R3   t	   enumerateRR   RE   RD   (   R   R2   Rc   RJ   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyR:   ÿ   s    )c         c   s‹   | r t  | ƒ } n  |  j ƒ  } xc | D][ } xR |  j | ƒ D]A } t  |  j | ƒ ƒ } | sk | | @r> |  j | | ƒ Vq> q> Wq( Wd  S(   N(   R   Ru   R]   R   (   R   RO   Ry   R   RJ   Rd   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyRR     s    t    c         C   sQ   g  t  j |  j | ƒ ƒ D] } | j d ƒ s | ^ q } | j d d „  ƒ | S(   Ns   .tmpt   keyc         S   s   t  |  j d ƒ d ƒ S(   NRt   i    (   Rw   RT   (   R8   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   <lambda>  s    (   R   RU   R   t   endswitht   sort(   R   t   dirR8   t   message_files(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyRu     s    c         G   s   t  j j |  j | Œ S(   N(   R   R   Rx   R   (   R   t   args(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyR     s    c         C   s,   t  | ƒ } z | j ƒ  SWd  | j ƒ  Xd  S(   N(   Rk   t   readRm   (   R   RJ   Rq   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyR>     s    c   
      C   sD  d } |  j  ƒ  } |  j ƒ  } x|  j ƒ  D]} |  j | ƒ } y t j |  j | ƒ ƒ } Wn9 t k
 r— } t j	 | ƒ t
 | k r<| d 7} q<q+ X| d | k } t
 | k rü | r<|  j ƒ  }	 t j | |	 ƒ |  j |	 t | ƒ t t
 ƒ ƒ q<q+ | r2| | k r2|  j | t | ƒ t t
 ƒ Bƒ n  | d 7} q+ Wd S(   se   
        Unhold accepted messages left behind, and hold unaccepted
        pending messages.
        i    i   R   N(   R3   R%   RR   R]   R    R?   R>   R@   RA   RB   RE   Rj   R   Rn   Ro   R   (
   R   t   offsetR2   RH   t   old_filenameRd   RL   RM   t   acceptedt   new_filename(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyR!      s(    &#c         C   s3   t  j j | ƒ } d | k r/ | j d ƒ d Sd S(   NRt   i   R}   (   R   R   t   basenameRT   (   R   R   RŠ   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyR]   <  s    c         C   sz   t  j j | ƒ \ } } t  j j | | j d ƒ d ƒ } | rf | d d j t t | ƒ ƒ ƒ 7} n  t  j | | ƒ | S(   NRt   i    R}   (   R   R   RT   Rx   R    R   Rn   (   R   R   Rd   t   dirnameRŠ   t   new_path(    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyRo   B  s    "&c         C   s!   |  j  | |  j | ƒ | ƒ d  S(   N(   Ro   R]   (   R   R   Rd   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyRC   J  s    i  N(&   t   __name__t
   __module__t   __doc__R   Rf   t   timeR   R   R#   R%   R&   R(   R*   R,   R-   R/   R1   R3   R5   R6   R;   R<   RN   RY   RZ   R\   Re   Rr   Rj   R:   RR   Ru   R   R>   R!   R]   Ro   RC   (    (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyR      sD   																				#		
					c          O   sG   d d l  m } t |  | Ž  } x! | j ƒ  D] } | j | ƒ q, W| S(   sF   Get a L{MessageStore} object with all Landscape message schemas added.iÿÿÿÿ(   t   message_schemas(   t   landscape.message_schemasR‘   R   t   valuesR\   (   R„   t   kwargsR‘   t   storeR[   (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   get_default_message_storeN  s
    (   R   R   RP   RA   R   t   landscape.libR    t	   landscapeR   RE   RD   t   objectR   R–   (    (    (    s:   /usr/lib/python2.7/dist-packages/landscape/broker/store.pyt   <module>   s   ÿ ?