Source code for libmushu.ampdecorator

# ampdecorator.py
# Copyright (C) 2013  Bastian Venthur
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


"""
This module provides the :class:`AmpDecorator` class.

As a user, it is very unlikely that you'll have to deal with it
directly. Its main purpose is to add additional functionality to the low
level amplifier drivers. This functionality includes features like:
saving data to a file. or being able to receive marker via TCP/IP.

By using the :func:`libmushu.__init__.get_amp` method, you'll
automatically receive decorated amplifiers.

"""

from __future__ import division

import select
import socket
import time
from multiprocessing import Process, Queue, Event
import os
import struct
import json
import logging

from libmushu.amplifier import Amplifier


logger = logging.getLogger(__name__)
logger.info('Logger started')


END_MARKER = '\n'
BUFSIZE = 2**16
PORT = 12345


[docs]class AmpDecorator(Amplifier): """This class 'decorates' the Low-Level Amplifier classes with TCP-Marker and Save-To-File functionality. You use it by decorating (not as in Python-Decorator, but in the GoF sense) the low level amplifier class you want to use:: import libmushu from libmushu.ampdecorator import AmpDecorator from libmushu.driver.randomamp import RandomAmp amp = Ampdecorator(RandomAmp) Waring: The TCP marker timings on Windows have a resolution of 10ms-15ms. On Linux the resolution is 1us. This is due to limitations of Python's time.time method, or rather a Windows specific issue. There exists currently no precise timer, providing times which are comparable between two running processes on Windows. The performance counter provided on Windows, has a much better resolution but is relative to the processes start time and it drifts (1s per 100s), so it is only precise for a relatively short amount of time. If a higher precision is needed one has to replace the time.time calls with something which provides a better precision. For example one could create a third process which provides times or regularly synchronize both processes with the clock synchronization algorithm as described here: http://en.wikipedia.org/wiki/Network_Time_Protocol Alternatively one could use `timeGetTime` from Windows' Multi Media library, which is tunable via `timeBeginPeriod` and provides a precision of 1-2ms. Apparently this is the way Chrome and many others do it.:: from __future__ import division from ctypes import windll import time timeBeginPeriod = windll.winmm.timeBeginPeriod timeEndPeriod = windll.winmm.timeEndPeriod timeGetTime = windll.winmm.timeGetTime if __name__ == '__main__': # wrap the code that needs high precision in timeBegin- and # timeEndPeriod with the same parameter. The parameter is # the interval in ms you want as precision. Usually the # minimum value allowed is 1 (best). timeBeginPeriod(1) times = [] t_start = time.time() while time.time() < (time.time() + 1): times.append(timeGetTime()) times = sorted(list(set(times))) print(1000 / len(times)) timeEndPeriod(1) """ def __init__(self, ampcls): self.amp = ampcls() self.write_to_file = False @property
[docs] def presets(self): return self.amp.presets
[docs] def start(self, filename=None): # prepare files for writing self.write_to_file = False if filename is not None: self.write_to_file = True filename_marker = filename + '.marker' filename_eeg = filename + '.eeg' filename_meta = filename + '.meta' for filename in filename_marker, filename_eeg, filename_meta: if os.path.exists(filename): logger.error('A file "%s" already exists, aborting.' % filename) raise Exception self.fh_eeg = open(filename_eeg, 'w') self.fh_marker = open(filename_marker, 'w') self.fh_meta = open(filename_meta, 'w') # write meta data meta = {'Channels': self.amp.get_channels(), 'Sampling Frequency': self.amp.get_sampling_frequency(), 'Amp': str(self.amp) } json.dump(meta, self.fh_meta, indent=4) # start the marker server self.marker_queue = Queue() self.tcp_reader_running = Event() self.tcp_reader_running.set() tcp_reader_ready = Event() self.tcp_reader = Process(target=tcp_reader, args=(self.marker_queue, self.tcp_reader_running, tcp_reader_ready) ) self.tcp_reader.start() logger.debug('Waiting for TCP reader to become ready...') tcp_reader_ready.wait() logger.debug('TCP reader is ready...') # zero the sample counter self.received_samples = 0 # start the amp self.amp.start()
[docs] def stop(self): # stop the amp self.amp.stop() # stop the marker server self.tcp_reader_running.clear() logger.debug('Waiting for TCP reader process to stop...') self.tcp_reader.join() logger.debug('TCP reader process stopped.') # close the files if self.write_to_file: logger.debug('Closing files.') for fh in self.fh_eeg, self.fh_marker, self.fh_meta: fh.close()
[docs] def configure(self, **kwargs): self.amp.configure(**kwargs)
[docs] def get_data(self): """Get data from the amplifier. This method is supposed to get called as fast as possible (i.e hundreds of times per seconds) and returns the data and the markers. Returns ------- data : 2darray a numpy array (time, channels) of the EEG data markers : list of (float, str) a list of markers. Each element is a tuple of timestamp and string. The timestamp is the time in ms relative to the onset of the block of data. Note that negative values are *allowed* as well as values bigger than the length of the block of data returned. That is to be interpreted as a marker from the last block and a marker for a future block respectively. """ # get data and marker from underlying amp data, marker = self.amp.get_data() t = time.time() # length in sec of the new block according to #samples and fs block_duration = len(data) / self.amp.get_sampling_frequency() # abs time of start of the block t0 = t - block_duration # duration of all blocks except the current one duration = self.received_samples / self.amp.get_sampling_frequency() # merge markers tcp_marker = [] while not self.marker_queue.empty(): m = self.marker_queue.get() m[0] = (m[0] - t0) * 1000 tcp_marker.append(m) marker = sorted(marker + tcp_marker) # save data to files if self.write_to_file: for m in marker: self.fh_marker.write("%f %s\n" % (1000 * duration + m[0], m[1])) for t in data: for c in t: self.fh_eeg.write(struct.pack("f", c)) self.received_samples += len(data) return data, marker
[docs] def get_channels(self): return self.amp.get_channels()
[docs] def get_sampling_frequency(self): return self.amp.get_sampling_frequency()
[docs]def tcp_reader(queue, running, ready): """Marker-reading process. This method runs in a seperate process and receives TCP markers. Whenever a marker is received, it is put together with a timestamp into a queue. Parameters ---------- queue : Queue this queue is used to send markers to a different process running : Event this event is used to signal this process to terminate its main loop ready : Event this signal is used to signal the "parent"-process that this process is ready to receive marker """ # setup the server socket server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.setblocking(0) server_socket.bind(('localhost', PORT)) server_socket.listen(5) ready.set() # read the socket until 'running' is set to false rlist, wlist, elist = [server_socket], [], [] while running.is_set(): readables, _, _ = select.select(rlist, wlist, elist, 1) t = time.time() for sock in readables: if sock == server_socket: # a new connection is opened connection, address = sock.accept() logger.debug('Incoming connection from %s', address) rlist.append(connection) data_buffer = '' else: data = sock.recv(BUFSIZE) if not data: # an open connection was closed logger.debug('Connection closed') sock.close() rlist.remove(sock) else: # received maybe incomplete data from a # connection # TODO: are we? I thought we're using TCP? data_buffer = ''.join([data_buffer, data]) split = data_buffer.rsplit(END_MARKER, 1) messages = [] if len(split) > 1: # data_buffer contains at least one complete # message data_buffer = split[1] messages = split[0].split(END_MARKER) for m in messages: queue.put([t, m]) logger.debug('Terminated select-loop') # close all connections and the socket for s in rlist: s.close()