Source code for libmushu.driver.gtec

#!/usb/bin/env python

# gtec.py
# Copyright (C) 2013  Bastian Venthur
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


# TODO: update to new version of pyusb

import struct
import time
from exceptions import Exception
import logging

import usb
from scipy.signal import iirfilter
import numpy as np

from libmushu.amplifier import Amplifier


logger = logging.getLogger(__name__)
logger.info('Logger started')

ID_VENDOR_GTEC = 0x153c
# I saw an am with this vendorid too
ID_VENDOR_GTEC2 = 0x15c3
ID_PRODUCT_GUSB_AMP = 0x0001

CX_OUT = usb.TYPE_VENDOR | usb.ENDPOINT_OUT


[docs]class GUSBamp(Amplifier): def __init__(self): logger.info('Initializing GUSBamp instance') # list of available amps self.amps = [] for bus in usb.busses(): for device in bus.devices: if (device.idVendor in [ID_VENDOR_GTEC, ID_VENDOR_GTEC2] and device.idProduct == ID_PRODUCT_GUSB_AMP): self.amps.append(device) self.devh = None self.mode = None # Initialize the amplifier and make it ready. device = self.amps[0] self.devh = device.open() # detach kernel driver if nessecairy config = device.configurations[0] self.devh.setConfiguration(config) assert(len(config.interfaces) > 0) # sometimes it is the other one first_interface = config.interfaces[0][0] if first_interface is None: first_interface = config.interfaces[0][1] first_setting = first_interface.alternateSetting self.devh.claimInterface(first_interface) self.devh.setAltInterface(first_interface) # initialization straight from the usb-dump self.set_mode('data') self.devh.controlMsg(CX_OUT, 0xb6, value=0x80, buffer=0) self.devh.controlMsg(CX_OUT, 0xb5, value=0x80, buffer=0) self.devh.controlMsg(CX_OUT, 0xb9, value=0x00, buffer="\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10") self.set_slave_mode(False) self.devh.controlMsg(CX_OUT, 0xd3, value=0x01, buffer=0) self.devh.controlMsg(CX_OUT, 0xca, value=0x01, buffer=0) self.devh.controlMsg(CX_OUT, 0xc8, value=0x01, buffer="\x00"*16) self.set_common_reference() self.set_common_ground() self.set_calibration_mode('sine') self.set_sampling_ferquency(128, [False for i in range(16)], None, None)
[docs] def start(self): self.devh.controlMsg(CX_OUT, 0xb5, value=0x08, buffer=0) self.devh.controlMsg(CX_OUT, 0xf7, value=0x00, buffer=0)
[docs] def stop(self): self.devh.controlMsg(CX_OUT, 0xb8, [])
[docs] def get_data(self): """Get data.""" # TODO: should we use numpy arrays right here? # TODO: what is the in-endpoint # 0x2 or 0x86 endpoint = 0x86 # TODO what is the optimal number here size = 2028 #512 try: # TODO what is the optimal timeout here? data = self.devh.bulkRead(endpoint, size, 100) except usb.USBError: data = [] data = ''.join(map(chr, data)) data = np.fromstring(data, np.float32, len(data)/4) try: data = data.reshape(-1, 17) except: logger.error("Got incomplete packet from the amp, discarding it!") data = np.array([]).reshape(-1, 17) if self.mode == 'impedance': data = self.calculate_impedance(data) elif self.mode == 'data': # get data in mV data /= 8.15 return data, []
[docs] def get_channels(self): return [str(i) for i in range(17)]
@staticmethod
[docs] def is_available(): for bus in usb.busses(): for device in bus.devices: if (device.idVendor in [ID_VENDOR_GTEC, ID_VENDOR_GTEC2] and device.idProduct == ID_PRODUCT_GUSB_AMP): return True return False ########################################################################### # Low level amplifier methods ###########################################################################
[docs] def set_mode(self, mode): """Set mode, 'impedance', 'data'.""" if mode == 'impedance': self.devh.controlMsg(CX_OUT, 0xc9, value=0x00, buffer=0) self.devh.controlMsg(CX_OUT, 0xc2, value=0x03, buffer=0) self.mode = 'impedance' elif mode == 'calibrate': self.devh.controlMsg(CX_OUT, 0xc1, value=0x00, buffer=0) self.devh.controlMsg(CX_OUT, 0xc2, value=0x02, buffer=0) self.mode = 'calibration' elif mode == 'data': self.devh.controlMsg(CX_OUT, 0xc0, value=0x00, buffer=0) self.devh.controlMsg(CX_OUT, 0xc2, value=0x01, buffer=0) self.mode = 'data' else: raise AmpError('Unknown mode: %s' % mode)
[docs] def set_sampling_ferquency(self, fs, channels, bpfilter, notchfilter): """ Set the sampling frequency and filters for individual channels. Parameters: fs -- sampling frequency channels -- list of booleans: channels[0] == True: enable filter for channel 0 bpfilter -- tuple: parameters for the band pass filter (hp, lp, fs, order) or None notchfilter -- tuple: parameters for the band stop filter (hp, lp, fs, order) or None """ # we have: hp, lp, fs, order, typ # signal.iirfilter(order/2, [hp/(fs/2), lp/(fs/2)], ftype='butter', btype='band') # we get 18 coeffs and put them in as '<d' in the buffer # struct.pack('<'+'d'*18, *coeffs) # special filter: means no filter null_filter = "\x00\x00\x00\x00\x00\x00\xf0\x3f"+"\x00\x00\x00\x00\x00\x00\x00\x00"*17 if bpfilter: bp_hp, bp_lp, bp_fs, bp_order = bpfilter bp_b, bp_a = iirfilter(bp_order/2, [bp_hp/(bp_fs/2), bp_lp/(bp_fs/2)], ftype='butter', btype='band') bp_filter = list(bp_b) bp_filter.extend(list(bp_a)) bp_filter = struct.pack("<"+"d"*18, *bp_filter) else: bp_filter = null_filter if notchfilter: bs_hp, bs_lp, bs_fs, bs_order = notchfilter bs_b, bs_a = iirfilter(bs_order/2, [bs_hp/(bs_fs/2), bs_lp/(bs_fs/2)], ftype='butter', btype='bandstop') bs_filter = list(bs_b) # the notch filter has (always?) an order of 4 so fill the gaps with # zeros if len(bs_filter) < 9: diff = 9 - len(bs_filter) bs_filter.extend([0.0 for i in range(diff)]) bs_filter.extend(list(bs_a)) if len(bs_filter) < 18: diff = 18 - len(bs_filter) bs_filter.extend([0.0 for i in range(diff)]) bs_filter = struct.pack("<"+"d"*18, *bs_filter) else: bs_filter = null_filter # set the filters for all channels if bpfilter == notchfilter == None: self.devh.controlMsg(CX_OUT, 0xc6, value=0x01, buffer=bp_filter) self.devh.controlMsg(CX_OUT, 0xc7, value=0x01, buffer=bs_filter) else: idx = 1 for i in channels: if i: self.devh.controlMsg(CX_OUT, 0xc6, value=idx, buffer=bp_filter) self.devh.controlMsg(CX_OUT, 0xc7, value=idx, buffer=bs_filter) idx += 1 # set the sampling frequency self.devh.controlMsg(CX_OUT, 0xb6, value=fs, buffer=0)
[docs] def set_calibration_mode(self, mode): # buffer: [0x03, 0xd0, 0x07, 0x02, 0x00, 0xff, 0x07] # ==== ========== # (1) mode: # (2) amplitude: little endian (0x07d0 = 2000) if mode == 'sine': self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x03\xd0\x07\x02\x00\xff\x07") elif mode == 'sawtooth': self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x02\xd0\x07\x02\x00\xff\x07") elif mode == 'whitenoise': self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x05\xd0\x07\x02\x00\xff\x07") elif mode == 'square': self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x01\xd0\x07\x02\x00\xff\x07") else: raise AmpError('Unknown mode: %s' % mode)
[docs] def calculate_impedance(self, u_measured, u_applied=1e4): return (u_measured * 1e6) / (u_applied - u_measured) - 1e4
[docs] def set_common_ground(self, a=False, b=False, c=False, d=False): """Set common ground for the electrodes. Parameters: a, b, c, d -- correspond to the groups on the amp, either of them can be true or false """ v = (d << 3) + (c << 2) + (b << 1) + a self.devh.controlMsg(CX_OUT, 0xbe, value=v, buffer=0)
[docs] def set_common_reference(self, a=False, b=False, c=False, d=False): """Set common reference for the electrodes. Parameters: a, b, c, d -- correspond to the groups on the amp, either of them can be true or false """ v = (d << 3) + (c << 2) + (b << 1) + a self.devh.controlMsg(CX_OUT, 0xbf, value=v, buffer=0)
[docs] def set_slave_mode(self, slave): """Set amp into slave or master mode. Parameters: slave -- if true, set into slave mode, set to master otherwise """ v = 1 if slave else 0 self.devh.controlMsg(CX_OUT, 0xcd, value=v, buffer=0)
[docs]class AmpError(Exception): pass
[docs]def main(): amp = GUSBamp() amp.start() try: while True: t = time.time() data = amp.get_data() dt = time.time() - t if len(data) > 0: print "%.5f seconds (%.5f ps), length: %d" % (dt, (len(data) / 16.) * 1/dt, len(data)) finally: amp.stop()
if __name__ == '__main__': import sys import cProfile if len(sys.argv) > 1 and sys.argv[1].startswith('prof'): cProfile.run('main()', 'prof') else: main()