Source code for pikos.recorders.zeromq_recorder

# -*- coding: utf-8 -*-
#------------------------------------------------------------------------------
#  Package: Pikos toolkit
#  File: recorders/zeromq_recorder.py
#  License: LICENSE.TXT
#
#  Copyright (c) 2012, Enthought, Inc.
#  All rights reserved.
#------------------------------------------------------------------------------
import cPickle as pickle
import os

import zmq

from pikos.recorders.abstract_recorder import AbstractRecorder


class RecordingStopped(object):
    pass


[docs]class ZeroMQRecorder(AbstractRecorder): """ The ZeroMQ Recorder is a recorder that publishes each set of values on a 0MQ publish socket. Private ------- _filter : callable Used to check if the set `record` should be `recored`. The function accepts a tuple of the `record` values and return True is the input sould be recored. _ready : bool Singify that the Recorder is ready to accept data. Please use the Recorder.ready property """
[docs] def __init__(self, zmq_host='127.0.0.1', zmq_port=9001, filter_=None, wait_for_ready=True, **kwargs): """ Class initialization. Parameters ---------- filter_ : callable A callable function that accepts a data tuple and returns True if the input sould be recorded. """ self._context = zmq.Context() self._socket = self._context.socket(zmq.PUB) self._socket.bind('tcp://{0}:{1}'.format(zmq_host, zmq_port)) if wait_for_ready: self._prepare_socket = self._context.socket(zmq.REQ) self._prepare_socket.connect('tcp://{0}:{1}'.format( zmq_host, zmq_port + 1)) else: self._prepare_socket = None self._filter = (lambda x: True) if filter_ is None else filter_ self._ready = not wait_for_ready
[docs] def prepare(self, record): """ Write the header in the csv file the first time it is called. """ if not self._ready: ready = False handshake_message = pickle.dumps( (os.getpid(), "Memory", record._fields)) while not ready: self._prepare_socket.send(handshake_message) ready = pickle.loads(self._prepare_socket.recv()) is True self._ready = True self._prepare_socket.close() self._prepare_socket = None
[docs] def finalize(self): """ Signal that recording has ended. """ if self._ready: self._socket.send(pickle.dumps(RecordingStopped()))
@property
[docs] def ready(self): """ Is the recorder ready to accept data? """ return self._ready
[docs] def record(self, record): """ Rerord entry onlty when the filter function returns True. """ if self._ready and self._filter(record): message = (os.getpid(), record) self._socket.send(pickle.dumps(message))