IO developers’ guide

Guidelines for IO implementation

There are two ways to add a new IO module:
  • By directly adding a new IO class in a module within neo.io: the reader/writer will deal directly with Neo objects
  • By adding a RawIO class in a module within neo.rawio: the reader should work with raw buffers from the file and provide some internal headers for the scale/units/name/… You can then generate an IO module simply by inheriting from your RawIO class and from neo.io.BaseFromRaw

For read only classes, we encourage you to write a RawIO class because it allows slice reading, and is generally much quicker and easier (although only for reading) than implementing a full IO class. For read/write classes you can mix the two levels neo.rawio for reading and neo.io for writing.

Recipe to develop an IO module for a new data format:
  1. Fully understand the object model. See Neo core. If in doubt ask the mailing list.
  2. Fully understand neo.io.examplerawio, It is a fake IO to explain the API. If in doubt ask the list.
  3. Copy/paste examplerawio.py and choose clear file and class names for your IO.
  4. implement all methods that raise(NotImplementedError) in neo.rawio.baserawio. Return None when the object is not supported (spike/waveform)
  5. Write good docstrings. List dependencies, including minimum version numbers.
  6. Add your class to neo.rawio.__init__. Keep imports inside try/except for dependency reasons.
  7. Create a class in neo/io/
  8. Add your class to neo.io.__init__. Keep imports inside try/except for dependency reasons.
  9. Create an account at https://gin.g-node.org and deposit files in NeuralEnsemble/ephy_testing_data.
  10. Write tests in neo/rawio/test_xxxxxrawio.py. You must at least pass the standard tests (inherited from BaseTestRawIO). See test_examplerawio.py
  11. Write a similar test in neo.tests/iotests/test_xxxxxio.py. See test_exampleio.py
  12. Make a pull request when all tests pass.

Miscellaneous

  • If your IO supports several versions of a format (like ABF1, ABF2), upload to the gin.g-node.org test file repository all file versions possible. (for test coverage).
  • neo.core.Block.create_many_to_one_relationship() offers a utility to complete the hierachy when all one-to-many relationships have been created.
  • In the docstring, explain where you obtained the file format specification if it is a closed one.
  • If your IO is based on a database mapper, keep in mind that the returned object MUST be detached, because this object can be written to another url for copying.

Tests

neo.rawio.tests.common_rawio_test.BaseTestRawIO and neo.test.io.commun_io_test.BaseTestIO provide standard tests. To use these you need to upload some sample data files at gin-gnode. They will be publicly accessible for testing Neo. These tests:

  • check the compliance with the schema: hierachy, attribute types, …
  • For IO modules able to both write and read data, it compares a generated dataset with the same data after a write/read cycle.

The test scripts download all files from gin-gnode and stores them locally in /tmp/files_for_tests/. Subsequent test runs use the previously downloaded files, rather than trying to download them each time.

Each test must have at least one class that inherits BaseTestRawIO and that has 3 attributes:
  • rawioclass: the class
  • entities_to_test: a list of files (or directories) to be tested one by one
  • files_to_download: a list of files to download (sometimes bigger than entities_to_test)

Here is an example test script taken from the distribution: test_axonrawio.py:

# -*- coding: utf-8 -*-

# needed for python 3 compatibility
from __future__ import unicode_literals, print_function, division, absolute_import

import unittest

from neo.rawio.axonrawio import AxonRawIO

from neo.rawio.tests.common_rawio_test import BaseTestRawIO


class TestAxonRawIO(BaseTestRawIO, unittest.TestCase, ):
    rawioclass = AxonRawIO
    entities_to_test = [
        'File_axon_1.abf',  # V2.0
        'File_axon_2.abf',  # V1.8
        'File_axon_3.abf',  # V1.8
        'File_axon_4.abf',  # 2.0
        'File_axon_5.abf',  # V.20
        'File_axon_6.abf',  # V.20
        'File_axon_7.abf',  # V2.6
    ]
    files_to_download = entities_to_test

    def test_read_raw_protocol(self):
        reader = AxonRawIO(filename=self.get_filename_path('File_axon_7.abf'))
        reader.parse_header()

        reader.read_raw_protocol()


if __name__ == "__main__":
    unittest.main()

Logging

All IO classes by default have logging using the standard logging module: already set up. The logger name is the same as the fully qualified class name, e.g. neo.io.hdf5io.NeoHdf5IO. The class.logger attribute holds the logger for easy access.

There are generally 3 types of situations in which an IO class should use a logger

  • Recoverable errors with the file that the users need to be notified about. In this case, please use logger.warning() or logger.error(). If there is an exception associated with the issue, you can use logger.exception() in the exception handler to automatically include a backtrace with the log. By default, all users will see messages at this level, so please restrict it only to problems the user absolutely needs to know about.
  • Informational messages that advanced users might want to see in order to get some insight into the file. In this case, please use logger.info().
  • Messages useful to developers to fix problems with the io class. In this case, please use logger.debug().

A log handler is automatically added to neo, so please do not user your own handler. Please use the class.logger attribute for accessing the logger inside the class rather than logging.getLogger(). Please do not log directly to the root logger (e.g. logging.warning()), use the class’s logger instead (class.logger.warning()). In the tests for the io class, if you intentionally test broken files, please disable logs by setting the logging level to 100.

ExampleIO

class neo.io.ExampleIO(filename='')

Here is the entire file:

# -*- coding: utf-8 -*-
"""
ExampleRawIO is a class of a  fake example.
This is to be used when coding a new RawIO.


Rules for creating a new class:
  1. Step 1: Create the main class
    * Create a file in **neo/rawio/** that endith with "rawio.py"
    * Create the class that inherits BaseRawIO
    * copy/paste all methods that need to be implemented.
      See the end a neo.rawio.baserawio.BaseRawIO
    * code hard! The main difficulty **is _parse_header()**.
      In short you have a create a mandatory dict than
      contains channel informations::

            self.header = {}
            self.header['nb_block'] = 2
            self.header['nb_segment'] = [2, 3]
            self.header['signal_channels'] = sig_channels
            self.header['unit_channels'] = unit_channels
            self.header['event_channels'] = event_channels

  2. Step 2: RawIO test:
    * create a file in neo/rawio/tests with the same name with "test_" prefix
    * copy paste neo/rawio/tests/test_examplerawio.py and do the same

  3. Step 3 : Create the neo.io class with the wrapper
    * Create a file in neo/io/ that endith with "io.py"
    * Create a that hinerits bot yrou RawIO class and BaseFromRaw class
    * copy/paste from neo/io/exampleio.py

  4.Step 4 : IO test
    * create a file in neo/test/iotest with the same previous name with "test_" prefix
    * copy/paste from neo/test/iotest/test_exampleio.py



"""
from __future__ import unicode_literals, print_function, division, absolute_import

from .baserawio import (BaseRawIO, _signal_channel_dtype, _unit_channel_dtype,
                        _event_channel_dtype)

import numpy as np


class ExampleRawIO(BaseRawIO):
    """
    Class for "reading" fake data from an imaginary file.

    For the user, it give acces to raw data (signals, event, spikes) as they
    are in the (fake) file int16 and int64.

    For a developer, it is just an example showing guidelines for someone who wants
    to develop a new IO module.

    Two rules for developers:
      * Respect the Neo RawIO API (:ref:`_neo_rawio_API`)
      * Follow :ref:`_io_guiline`

    This fake IO:
        * have 2 blocks
        * blocks have 2 and 3 segments
        * have 16 signal_channel sample_rate = 10000
        * have 3 unit_channel
        * have 2 event channel: one have *type=event*, the other have
          *type=epoch*


    Usage:
        >>> import neo.rawio
        >>> r = neo.rawio.ExampleRawIO(filename='itisafake.nof')
        >>> r.parse_header()
        >>> print(r)
        >>> raw_chunk = r.get_analogsignal_chunk(block_index=0, seg_index=0,
                            i_start=0, i_stop=1024,  channel_names=channel_names)
        >>> float_chunk = reader.rescale_signal_raw_to_float(raw_chunk, dtype='float64',
                            channel_indexes=[0, 3, 6])
        >>> spike_timestamp = reader.spike_timestamps(unit_index=0, t_start=None, t_stop=None)
        >>> spike_times = reader.rescale_spike_timestamp(spike_timestamp, 'float64')
        >>> ev_timestamps, _, ev_labels = reader.event_timestamps(event_channel_index=0)

    """
    extensions = ['fake']
    rawmode = 'one-file'

    def __init__(self, filename=''):
        BaseRawIO.__init__(self)
        # note that this filename is ued in self._source_name
        self.filename = filename

    def _source_name(self):
        # this function is used by __repr__
        # for general cases self.filename is good
        # But for URL you could mask some part of the URL to keep
        # the main part.
        return self.filename

    def _parse_header(self):
        # This is the central of a RawIO
        # we need to collect in the original format all
        # informations needed for further fast acces
        # at any place in the file
        # In short _parse_header can be slow but
        # _get_analogsignal_chunk need to be as fast as possible

        # create signals channels information
        # This is mandatory!!!!
        # gain/offset/units are really important because
        # the scaling to real value will be done with that
        # at the end real_signal = (raw_signal* gain + offset) * pq.Quantity(units)
        sig_channels = []
        for c in range(16):
            ch_name = 'ch{}'.format(c)
            # our channel id is c+1 just for fun
            # Note that chan_id should be realated to
            # original channel id in the file format
            # so that the end user should not be lost when reading datasets
            chan_id = c + 1
            sr = 10000.  # Hz
            dtype = 'int16'
            units = 'uV'
            gain = 1000. / 2 ** 16
            offset = 0.
            # group_id isonly for special cases when channel have diferents
            # sampling rate for instance. See TdtIO for that.
            # Here this is the general case :all channel have the same characteritics
            group_id = 0
            sig_channels.append((ch_name, chan_id, sr, dtype, units, gain, offset, group_id))
        sig_channels = np.array(sig_channels, dtype=_signal_channel_dtype)

        # creating units channels
        # This is mandatory!!!!
        # Note that if there is no waveform at all in the file
        # then wf_units/wf_gain/wf_offset/wf_left_sweep/wf_sampling_rate
        # can be set to any value because _spike_raw_waveforms
        # will return None
        unit_channels = []
        for c in range(3):
            unit_name = 'unit{}'.format(c)
            unit_id = '#{}'.format(c)
            wf_units = 'uV'
            wf_gain = 1000. / 2 ** 16
            wf_offset = 0.
            wf_left_sweep = 20
            wf_sampling_rate = 10000.
            unit_channels.append((unit_name, unit_id, wf_units, wf_gain,
                                  wf_offset, wf_left_sweep, wf_sampling_rate))
        unit_channels = np.array(unit_channels, dtype=_unit_channel_dtype)

        # creating event/epoch channel
        # This is mandatory!!!!
        # In RawIO epoch and event they are dealt the same way.
        event_channels = []
        event_channels.append(('Some events', 'ev_0', 'event'))
        event_channels.append(('Some epochs', 'ep_1', 'epoch'))
        event_channels = np.array(event_channels, dtype=_event_channel_dtype)

        # fille into header dict
        # This is mandatory!!!!!
        self.header = {}
        self.header['nb_block'] = 2
        self.header['nb_segment'] = [2, 3]
        self.header['signal_channels'] = sig_channels
        self.header['unit_channels'] = unit_channels
        self.header['event_channels'] = event_channels

        # insert some annotation at some place
        # at neo.io level IO are free to add some annoations
        # to any object. To keep this functionality with the wrapper
        # BaseFromRaw you can add annoations in a nested dict.
        self._generate_minimal_annotations()
        # If you are a lazy dev you can stop here.
        for block_index in range(2):
            bl_ann = self.raw_annotations['blocks'][block_index]
            bl_ann['name'] = 'Block #{}'.format(block_index)
            bl_ann['block_extra_info'] = 'This is the block {}'.format(block_index)
            for seg_index in range([2, 3][block_index]):
                seg_ann = bl_ann['segments'][seg_index]
                seg_ann['name'] = 'Seg #{} Block #{}'.format(
                    seg_index, block_index)
                seg_ann['seg_extra_info'] = 'This is the seg {} of block {}'.format(
                    seg_index, block_index)
                for c in range(16):
                    anasig_an = seg_ann['signals'][c]
                    anasig_an['info'] = 'This is a good signals'
                for c in range(3):
                    spiketrain_an = seg_ann['units'][c]
                    spiketrain_an['quality'] = 'Good!!'
                for c in range(2):
                    event_an = seg_ann['events'][c]
                    if c == 0:
                        event_an['nickname'] = 'Miss Event 0'
                    elif c == 1:
                        event_an['nickname'] = 'MrEpoch 1'

    def _segment_t_start(self, block_index, seg_index):
        # this must return an float scale in second
        # this t_start will be shared by all object in the segment
        # except AnalogSignal
        all_starts = [[0., 15.], [0., 20., 60.]]
        return all_starts[block_index][seg_index]

    def _segment_t_stop(self, block_index, seg_index):
        # this must return an float scale in second
        all_stops = [[10., 25.], [10., 30., 70.]]
        return all_stops[block_index][seg_index]

    def _get_signal_size(self, block_index, seg_index, channel_indexes=None):
        # we are lucky: signals in all segment have the same shape!! (10.0 seconds)
        # it is not always the case
        # this must return an int = the number of sample

        # Note that channel_indexes can be ignored for most cases
        # except for several sampling rate.
        return 100000

    def _get_signal_t_start(self, block_index, seg_index, channel_indexes):
        # This give the t_start of signals.
        # Very often this equal to _segment_t_start but not
        # always.
        # this must return an float scale in second

        # Note that channel_indexes can be ignored for most cases
        # except for several sampling rate.

        # Here this is the same.
        # this is not always the case
        return self._segment_t_start(block_index, seg_index)

    def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes):
        # this must return a signal chunk limited with
        # i_start/i_stop (can be None)
        # channel_indexes can be None (=all channel) or a list or numpy.array
        # This must return a numpy array 2D (even with one channel).
        # This must return the orignal dtype. No conversion here.
        # This must as fast as possible.
        # Everything that can be done in _parse_header() must not be here.

        # Here we are lucky:  our signals is always zeros!!
        # it is not always the case
        # internally signals are int16
        # convertion to real units is done with self.header['signal_channels']

        if i_start is None:
            i_start = 0
        if i_stop is None:
            i_stop = 100000

        assert i_start >= 0, "I don't like your jokes"
        assert i_stop <= 100000, "I don't like your jokes"

        if channel_indexes is None:
            nb_chan = 16
        else:
            nb_chan = len(channel_indexes)
        raw_signals = np.zeros((i_stop - i_start, nb_chan), dtype='int16')
        return raw_signals

    def _spike_count(self, block_index, seg_index, unit_index):
        # Must return the nb of spike for given (block_index, seg_index, unit_index)
        # we are lucky:  our units have all the same nb of spikes!!
        # it is not always the case
        nb_spikes = 20
        return nb_spikes

    def _get_spike_timestamps(self, block_index, seg_index, unit_index, t_start, t_stop):
        # In our IO, timstamp are internally coded 'int64' and they
        # represent the index of the signals 10kHz
        # we are lucky: spikes have the same discharge in all segments!!
        # incredible neuron!! This is not always the case

        # the same clip t_start/t_start must be used in _spike_raw_waveforms()

        ts_start = (self._segment_t_start(block_index, seg_index) * 10000)

        spike_timestamps = np.arange(0, 10000, 500) + ts_start

        if t_start is not None or t_stop is not None:
            # restricte spikes to given limits (in seconds)
            lim0 = int(t_start * 10000)
            lim1 = int(t_stop * 10000)
            mask = (spike_timestamps >= lim0) & (spike_timestamps <= lim1)
            spike_timestamps = spike_timestamps[mask]

        return spike_timestamps

    def _rescale_spike_timestamp(self, spike_timestamps, dtype):
        # must rescale to second a particular spike_timestamps
        # with a fixed dtype so the user can choose the precisino he want.
        spike_times = spike_timestamps.astype(dtype)
        spike_times /= 10000.  # because 10kHz
        return spike_times

    def _get_spike_raw_waveforms(self, block_index, seg_index, unit_index, t_start, t_stop):
        # this must return a 3D numpy array (nb_spike, nb_channel, nb_sample)
        # in the original dtype
        # this must be as fast as possible.
        # the same clip t_start/t_start must be used in _spike_timestamps()

        # If there there is no waveform supported in the
        # IO them _spike_raw_waveforms must return None

        # In our IO waveforms come from all channels
        # they are int16
        # convertion to real units is done with self.header['unit_channels']
        # Here, we have a realistic case: all waveforms are only noise.
        # it is not always the case
        # we 20 spikes with a sweep of 50 (5ms)

        np.random.seed(2205)  # a magic number (my birthday)
        waveforms = np.random.randint(low=-2 ** 4, high=2 ** 4, size=20 * 50, dtype='int16')
        waveforms = waveforms.reshape(20, 1, 50)
        return waveforms

    def _event_count(self, block_index, seg_index, event_channel_index):
        # event and spike are very similar
        # we have 2 event channels
        if event_channel_index == 0:
            # event channel
            return 6
        elif event_channel_index == 1:
            # epoch channel
            return 10

    def _get_event_timestamps(self, block_index, seg_index, event_channel_index, t_start, t_stop):
        # the main difference between spike channel and event channel
        # is that for here we have 3 numpy array timestamp, durations, labels
        # durations must be None for 'event'
        # label must a dtype ='U'

        # in our IO event are directly coded in seconds
        seg_t_start = self._segment_t_start(block_index, seg_index)
        if event_channel_index == 0:
            timestamp = np.arange(0, 6, dtype='float64') + seg_t_start
            durations = None
            labels = np.array(['trigger_a', 'trigger_b'] * 3, dtype='U12')
        elif event_channel_index == 1:
            timestamp = np.arange(0, 10, dtype='float64') + .5 + seg_t_start
            durations = np.ones((10), dtype='float64') * .25
            labels = np.array(['zoneX'] * 5 + ['zoneZ'] * 5, dtype='U12')

        if t_start is not None:
            keep = timestamp >= t_start
            timestamp, labels = timestamp[keep], labels[keep]
            if durations is not None:
                durations = durations[keep]

        if t_stop is not None:
            keep = timestamp <= t_stop
            timestamp, labels = timestamp[keep], labels[keep]
            if durations is not None:
                durations = durations[keep]

        return timestamp, durations, labels

    def _rescale_event_timestamp(self, event_timestamps, dtype):
        # must rescale to second a particular event_timestamps
        # with a fixed dtype so the user can choose the precisino he want.

        # really easy here because in our case it is already seconds
        event_times = event_timestamps.astype(dtype)
        return event_times

    def _rescale_epoch_duration(self, raw_duration, dtype):
        # really easy here because in our case it is already seconds
        durations = raw_duration.astype(dtype)
        return durations
# -*- coding: utf-8 -*-
"""
neo.io have been split in 2 level API:
  * neo.io: this API give neo object
  * neo.rawio: this API give raw data as they are in files.

Developper are encourage to use neo.rawio.

When this is done the neo.io is done automagically with
this king of following code.

Author: sgarcia

"""

from neo.io.basefromrawio import BaseFromRaw
from neo.rawio.examplerawio import ExampleRawIO


class ExampleIO(ExampleRawIO, BaseFromRaw):
    name = 'example IO'
    description = "Fake IO"

    # This is an inportant choice when there are several channels.
    #   'split-all' :  1 AnalogSignal each 1 channel
    #   'group-by-same-units' : one 2D AnalogSignal for each group of channel with same units
    _prefered_signal_group_mode = 'group-by-same-units'

    def __init__(self, filename=''):
        ExampleRawIO.__init__(self, filename=filename)
        BaseFromRaw.__init__(self, filename)