Source code for libmushu.driver.emotiv

#!/usr/bin/env python

# emotiv.py
# Copyright (C) 2013  Bastian Venthur
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


from Crypto.Cipher import AES
import numpy as np

import usb.core
import usb.util

from libmushu.amplifier import Amplifier


VENDOR_ID = 0x1234
PRODUCT_ID = 0xed02

ENDPOINT_IN = usb.util.ENDPOINT_IN | 2  # second endpoint


[docs]class Epoc(Amplifier): def __init__(self): # find amplifier self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID) if self.dev is None: raise RuntimeError('Emotiv device is not connected.') # pyusb docs say you *have* to call set_configuration, but it does not # work unless i *don't* call it. #dev.set_configuration() # get the serial number serial = usb.util.get_string(self.dev, 17, self.dev.iSerialNumber) # claim the device and it's two interfaces if self.dev.is_kernel_driver_active(0): self.dev.detach_kernel_driver(0) usb.util.claim_interface(self.dev, 0) if self.dev.is_kernel_driver_active(1): self.dev.detach_kernel_driver(1) usb.util.claim_interface(self.dev, 1) # prepare AES self.cipher = AES.new(self.generate_key(serial, True)) # internal states for battery and impedance we have to store since it # is not sent with every frame. self._battery = 0 self._quality = [0 for i in range(14)] # channel info self.channel = ['Counter', 'Battery', 'F3', 'FC5', 'AF3', 'F7', 'T7', 'P7', 'O1', 'O2', 'P8', 'T8', 'F8', 'AF4', 'FC6', 'F4', 'Gyro 1', 'Cyro 2', 'Quality F3', 'Quality FC5', 'Quality AF3', 'Quality F7', 'Quality T7', 'Quality P7', 'Quality 01', 'Quality O2', 'Quality P8', 'Quality T8', 'Quality F8', 'Quality AF4', 'Quality FC6', 'Quality F4']
[docs] def get_data(self): try: raw = self.dev.read(ENDPOINT_IN, 32, 1, timeout=1000) raw = self.decrypt(raw) data = self.parse_raw(raw) data = np.array(data) except Exception as e: print e data = np.array() return data.reshape(1, -1), []
@staticmethod
[docs] def is_available(): if usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID) is None: return False else: return True
[docs] def generate_key(self, sn, research=True): """Generate the encryption key. The key is based on the serial number of the device and the information weather it is a research- or consumer device. """ if research: key = ''.join([sn[15], '\x00', sn[14], '\x54', sn[13], '\x10', sn[12], '\x42', sn[15], '\x00', sn[14], '\x48', sn[13], '\x00', sn[12], '\x50']) else: key = ''.join([sn[15], '\x00', sn[14], '\x48', sn[13], '\x00', sn[12], '\x54', sn[15], '\x10', sn[14], '\x42', sn[13], '\x00', sn[12], '\x50']) return key
[docs] def decrypt(self, raw): """Decrypt a raw package.""" data = self.cipher.decrypt(raw[:16]) + self.cipher.decrypt(raw[16:]) tmp = 0 for i in range(32): tmp = tmp << 8 tmp += ord(data[i]) return tmp
[docs] def parse_raw(self, raw): """Parse raw data.""" data = [] shift = 256 # 1x counter / battery (8 bit) # if the first bit is not set, the remaining 7 bits are the counter # otherwise the remaining bits are the battery shift -= 8 tmp = (raw >> shift) & 0b11111111 if tmp & 0b10000000: # battery counter = 128 self._battery = tmp & 0b01111111 else: # counter counter = tmp & 0b01111111 data.append(counter) data.append(self._battery) # 7x data (14 bit) for i in range(7): shift -= 14 data.append((raw >> shift) & 0b11111111111111) # 1x quality (14 bit) # we assume it is the contact quality for an electrode, the counter # gives the number of the electrode. since we only have 14 electrodes # we only take the values from counters 0..13 and 64..77 shift -= 14 tmp = (raw >> shift) & 0b11111111111111 if counter < 128: if counter % 64 < 14: self._quality[counter % 64] = tmp # 1x ??? (14 bit) shift -= 14 # 7x data (14 bit) for i in range(7): shift -= 14 data.append((raw >> shift) & 0b11111111111111) # 2x gyroscope (8 bit) # the first bit denotes the sign the remaining 7 bits the number for i in range(2): shift -= 8 tmp = (raw >> shift) & 0b01111111 tmp -= 100 if (raw >> shift) & 0b10000000: tmp *= -1 data.append(tmp) # 1x ??? (8 bit) shift -= 8 data.extend(self._quality) return [int(i) for i in data]
if __name__ == '__main__': amp = Epoc() print 'Reading...' while 1: try: print amp.get_data() except Exception as e: print e break