Adding support for a new file format#

If you would like to use Neo with a file format that is not yet supported, you have two options:

  1. Make an enhancement request (see Reporting bugs, requesting new features). There is no guarantee that a Neo developer will have the time to work on this, but the more information you can give about the format, and the more people upvote your request, the more likely it will be implemented.

  2. Implement it yourself. If you’d like to try this approach, first read Contributing to Neo development, then return and read the rest of this page.

There are two ways to implement a new IO module:

  1. by directly adding a new IO class in a module within neo.io: the reader/writer will deal directly with Neo objects;

  2. by adding a RawIO class in a module within neo.rawio: the reader should work with raw buffers from the file and provide some internal headers for the scale/units/name/… You can then generate an IO module simply by inheriting from your RawIO class and from neo.io.BaseFromRaw.

For read-only classes, we encourage you to write a RawIO class because it allows slice reading, and is generally much quicker and easier than implementing a full IO class. For read/write classes you can mix the two levels: neo.rawio for reading and neo.io for writing.

Recipe to develop an IO module for a new data format#

  1. Fully understand the object model. If in doubt ask the mailing list.

  2. Fully understand neo.rawio.examplerawio, which is a fake IO module to explain the API. If in doubt ask the list.

  3. Copy/paste examplerawio.py and choose clear file and class names for your IO.

  4. implement all methods that raise NotImplementedError in neo.rawio.baserawio. Return None when the object is not supported (e.g., spikes, waveforms)

  5. Write good docstrings. List dependencies, including minimum version numbers.

  6. Add your class to neo.rawio.__init__. Keep imports inside try/except for dependency reasons.

  7. Create a class in neo/io/

  8. Add your class to neo.io.__init__. Keep imports inside try/except for dependency reasons.

  9. Create an account at https://gin.g-node.org and deposit files in NeuralEnsemble/ephy_testing_data.

  10. Write tests in neo/rawio/test_xxxxxrawio.py. You must at least pass the standard tests (inherited from BaseTestRawIO). See test_examplerawio.py

  11. Write a similar test in neo.tests/iotests/test_xxxxxio.py. See test_exampleio.py

  12. Make a pull request when all tests pass.

Additional suggestions#

  • If your IO supports several versions of a format (like ABF1, ABF2), upload to the gin.g-node.org test file repository all file versions possible (for test coverage).

  • neo.core.Block.create_many_to_one_relationship() offers a utility to complete the hierarchy when all one-to-many relationships have been created.

  • In the docstring, explain where you obtained the file format specification, especially if it is a closed one.

  • If your IO is based on a database mapper, keep in mind that the returned object MUST be detached,

    because this object can be written to another url for copying.

Tests#

neo.rawio.tests.common_rawio_test.BaseTestRawIO and neo.test.io.commun_io_test.BaseTestIO provide standard tests. To use these you need to upload some sample data files at gin-gnode. They will be publicly accessible for testing Neo. These tests:

  • check compliance with the schema: hierarchy, attribute types, …

  • for IO modules able to both write and read data, compare a generated dataset with the same data after a write/read cycle.

The test scripts download all files from gin-gnode and store them locally in a temporary directory. Subsequent test runs use the previously downloaded files, rather than trying to download them each time.

Each test must have at least one class that inherits BaseTestRawIO and that has 3 attributes:

  • rawioclass: the class

  • entities_to_download: a list of files or directories to download

  • entities_to_test: a list of files (or directories) to be tested one by one

Here is an example test script taken from the distribution: test_axonrawio.py:

import unittest

from neo.rawio.axonrawio import AxonRawIO

from neo.test.rawiotest.common_rawio_test import BaseTestRawIO


class TestAxonRawIO(
    BaseTestRawIO,
    unittest.TestCase,
):
    rawioclass = AxonRawIO
    entities_to_test = [
        "axon/File_axon_1.abf",  # V2.0
        "axon/File_axon_2.abf",  # V1.8
        "axon/File_axon_3.abf",  # V1.8
        "axon/File_axon_4.abf",  # 2.0
        "axon/File_axon_5.abf",  # V.20
        "axon/File_axon_6.abf",  # V.20
        "axon/File_axon_7.abf",  # V2.6
        "axon/test_file_edr3.abf",  # EDR3
    ]
    entities_to_download = ["axon"]

    def test_read_raw_protocol(self):
        reader = AxonRawIO(filename=self.get_local_path("axon/File_axon_7.abf"))
        reader.parse_header()

        reader.read_raw_protocol()


if __name__ == "__main__":
    unittest.main()

Logging#

All IO classes by default have logging using the standard logging module: already set up. The logger name is the same as the fully qualified class name, e.g. neo.io.nixio.NixIO. The class.logger attribute holds the logger for easy access.

There are generally 3 types of situations in which an IO class should use a logger:

  • Recoverable errors with the file that the users need to be notified about. In this case, please use logger.warning() or logger.error(). If there is an exception associated with the issue, you can use logger.exception() in the exception handler to automatically include a backtrace with the log. By default, all users will see messages at this level, so please restrict it only to problems the user absolutely needs to know about.

  • Informational messages that advanced users might want to see in order to get some insight into the file. In this case, please use logger.info().

  • Messages useful to developers to fix problems with the io class. In this case, please use logger.debug().

A log handler is automatically added to neo, so please do not use your own handler. Please use the class.logger attribute for accessing the logger inside the class rather than logging.getLogger(). Please do not log directly to the root logger (e.g. logging.warning()), use the class’s logger instead (class.logger.warning()). In the tests for the io class, if you intentionally test broken files, please disable logs by setting the logging level to 100.

ExampleIO#

class neo.rawio.ExampleRawIO(filename: str | Path = '')#

Class for “reading” fake data from an imaginary file.

For the user, it gives access to raw data (signals, event, spikes) as they are in the (fake) file int16 and int64.

For a developer, it is just an example showing guidelines for someone who wants to develop a new IO module.

Two rules for developers:
This fake IO:
  • has 2 blocks

  • blocks have 2 and 3 segments

  • has 2 signals streams of 8 channels each (sample_rate = 10000) so 16 channels in total

  • has 3 spike_channels

  • has 2 event channels: one has type=event, the other has type=epoch

Usage:
>>> import neo.rawio
>>> r = neo.rawio.ExampleRawIO(filename='itisafake.nof')
>>> r.parse_header()
>>> print(r)
>>> raw_chunk = r.get_analogsignal_chunk(block_index=0, seg_index=0,
                    i_start=0, i_stop=1024,  channel_names=channel_names)
>>> float_chunk = reader.rescale_signal_raw_to_float(raw_chunk, dtype='float64',
                    channel_indexes=[0, 3, 6])
>>> spike_timestamp = reader.spike_timestamps(spike_channel_index=0,
                    t_start=None, t_stop=None)
>>> spike_times = reader.rescale_spike_timestamp(spike_timestamp, 'float64')
>>> ev_timestamps, _, ev_labels = reader.event_timestamps(event_channel_index=0)
class neo.io.ExampleIO(filename='')#

Here are the entire files:

"""
ExampleRawIO is a class of a fake example.
This is to be used when coding a new RawIO.


Rules for creating a new class:
  1. Step 1: Create the main class
    * Create a file in **neo/rawio/** that ends with "rawio.py"
    * Create the class that inherits from BaseRawIO
    * copy/paste all methods that need to be implemented.
    * code hard! The main difficulty is `_parse_header()`.
      In short you have to create a mandatory dict that
      contains channel information::

            self.header = {}
            self.header['nb_block'] = 2
            self.header['nb_segment'] = [2, 3]
            self.header['signal_streams'] = signal_streams
            self.header['signal_channels'] = signal_channels
            self.header['spike_channels'] = spike_channels
            self.header['event_channels'] = event_channels

  2. Step 2: RawIO test:
    * create a file in neo/rawio/tests with the same name with "test_" prefix
    * copy paste neo/rawio/tests/test_examplerawio.py and do the same

  3. Step 3: Create the neo.io class with the wrapper
    * Create a file in neo/io/ that ends with "io.py"
    * Create a class that inherits both your RawIO class and the BaseFromRaw class
    * copy/paste from neo/io/exampleio.py

  4.Step 4: IO test
    * create a file in neo/test/iotest with the same name as previously with "test_" prefix
    * copy/paste from neo/test/iotest/test_exampleio.py

"""

from __future__ import annotations

import numpy as np
from pathlib import Path

from .baserawio import (
    BaseRawIO,
    _signal_channel_dtype,
    _signal_stream_dtype,
    _spike_channel_dtype,
    _event_channel_dtype,
)


class ExampleRawIO(BaseRawIO):
    """
    Class for "reading" fake data from an imaginary file.

    For the user, it gives access to raw data (signals, event, spikes) as they
    are in the (fake) file int16 and int64.

    For a developer, it is just an example showing guidelines for someone who wants
    to develop a new IO module.

    Two rules for developers:
      * Respect the :ref:`section-neo-rawio-API`
      * Follow the guidelines in :ref:`section-raw-io-recipe`

    This fake IO:
        * has 2 blocks
        * blocks have 2 and 3 segments
        * has  2 signals streams  of 8 channels each (sample_rate = 10000) so 16 channels in total
        * has 3 spike_channels
        * has 2 event channels: one has *type=event*, the other has
          *type=epoch*


    Usage:
        >>> import neo.rawio
        >>> r = neo.rawio.ExampleRawIO(filename='itisafake.nof')
        >>> r.parse_header()
        >>> print(r)
        >>> raw_chunk = r.get_analogsignal_chunk(block_index=0, seg_index=0,
                            i_start=0, i_stop=1024,  channel_names=channel_names)
        >>> float_chunk = reader.rescale_signal_raw_to_float(raw_chunk, dtype='float64',
                            channel_indexes=[0, 3, 6])
        >>> spike_timestamp = reader.spike_timestamps(spike_channel_index=0,
                            t_start=None, t_stop=None)
        >>> spike_times = reader.rescale_spike_timestamp(spike_timestamp, 'float64')
        >>> ev_timestamps, _, ev_labels = reader.event_timestamps(event_channel_index=0)

    """

    extensions = ["fake"]
    rawmode = "one-file"

    def __init__(self, filename: str | Path = ""):
        BaseRawIO.__init__(self)
        # note that this filename is ued in self._source_name
        self.filename = filename

    def _source_name(self):
        # this function is used by __repr__
        # for general cases self.filename is good
        # But for URL you could mask some part of the URL to keep
        # the main part.
        return self.filename

    def _parse_header(self):
        # This is the central part of a RawIO
        # we need to collect from the original format all
        # information required for fast access
        # at any place in the file
        # In short `_parse_header()` can be slow but
        # `_get_analogsignal_chunk()` needs to be as fast as possible

        # create fake signal streams information
        signal_streams = []
        for c in range(2):
            name = f"stream {c}"
            stream_id = c
            signal_streams.append((name, stream_id))
        signal_streams = np.array(signal_streams, dtype=_signal_stream_dtype)

        # create fake signal channels information
        # This is mandatory!!!!
        # gain/offset/units are really important because
        # the scaling to real value will be done with that
        # The real signal will be evaluated as `(raw_signal * gain + offset) * pq.Quantity(units)`
        signal_channels = []
        for c in range(16):
            ch_name = f"ch{c}"
            # our channel id is c+1 just for fun
            # Note that chan_id should be related to
            # original channel id in the file format
            # so that the end user should not be confused when reading datasets
            chan_id = c + 1
            sr = 10000.0  # Hz
            dtype = "int16"
            units = "uV"
            gain = 1000.0 / 2**16
            offset = 0.0
            # stream_id indicates how to group channels
            # channels inside a "stream" share the same characteristics
            #  (sampling rate/dtype/t_start/units/...)
            stream_id = str(c // 8)
            signal_channels.append((ch_name, chan_id, sr, dtype, units, gain, offset, stream_id))
        signal_channels = np.array(signal_channels, dtype=_signal_channel_dtype)

        # A stream can contain signals with different physical units.
        # Here, the two last channels will have different units (pA)
        # Since AnalogSignals must have consistent units across channels,
        # this stream will be split in 2 parts on the neo.io level and finally 3 AnalogSignals
        # will be generated per Segment.
        signal_channels[-2:]["units"] = "pA"

        # create fake unit channels
        # This is mandatory!!!!
        # Note that if there is no waveform at all in the file
        # then wf_units/wf_gain/wf_offset/wf_left_sweep/wf_sampling_rate
        # can be set to any value because _spike_raw_waveforms
        # will return None
        spike_channels = []
        for c in range(3):
            unit_name = f"unit{c}"
            unit_id = f"#{c}"
            wf_units = "uV"
            wf_gain = 1000.0 / 2**16
            wf_offset = 0.0
            wf_left_sweep = 20
            wf_sampling_rate = 10000.0
            spike_channels.append((unit_name, unit_id, wf_units, wf_gain, wf_offset, wf_left_sweep, wf_sampling_rate))
        spike_channels = np.array(spike_channels, dtype=_spike_channel_dtype)

        # creating event/epoch channel
        # This is mandatory!!!!
        # In RawIO epoch and event are dealt with in the same way.
        event_channels = []
        event_channels.append(("Some events", "ev_0", "event"))
        event_channels.append(("Some epochs", "ep_1", "epoch"))
        event_channels = np.array(event_channels, dtype=_event_channel_dtype)

        # fill information into the header dict
        # This is mandatory!!!!!
        self.header = {}
        self.header["nb_block"] = 2
        self.header["nb_segment"] = [2, 3]
        self.header["signal_streams"] = signal_streams
        self.header["signal_channels"] = signal_channels
        self.header["spike_channels"] = spike_channels
        self.header["event_channels"] = event_channels

        # insert some annotations/array_annotations at some place
        # at neo.io level. IOs can add annotations
        # to any object. To keep this functionality with the wrapper
        # BaseFromRaw you can add annotations in a nested dict.

        # `_generate_minimal_annotations()` must be called to generate the nested
        # dict of annotations/array_annotations
        self._generate_minimal_annotations()
        # this pprint lines really help with understanding the nested (and sometimes complicated) dict
        # from pprint import pprint
        # pprint(self.raw_annotations)

        # Until here all mandatory operations for setting up a rawio are implemented.
        # The following lines provide additional, recommended annotations for the
        # final neo objects.
        for block_index in range(2):
            bl_ann = self.raw_annotations["blocks"][block_index]
            bl_ann["name"] = f"Block #{block_index}"
            bl_ann["block_extra_info"] = f"This is the block {block_index}"
            for seg_index in range([2, 3][block_index]):
                seg_ann = bl_ann["segments"][seg_index]
                seg_ann["name"] = f"Seg #{seg_index} Block #{block_index}"
                seg_ann["seg_extra_info"] = f"This is the seg {seg_index} of block {block_index}"
                for c in range(2):
                    sig_an = seg_ann["signals"][c]["nickname"] = f"This stream {c} is from a subdevice"
                    # add some array annotations (8 channels)
                    sig_an = seg_ann["signals"][c]["__array_annotations__"]["impedance"] = np.random.rand(8) * 10000
                for c in range(3):
                    spiketrain_an = seg_ann["spikes"][c]
                    spiketrain_an["quality"] = "Good!!"
                    # add some array annotations
                    num_spikes = self.spike_count(block_index, seg_index, c)
                    spiketrain_an["__array_annotations__"]["amplitudes"] = np.random.randn(num_spikes)

                for c in range(2):
                    event_an = seg_ann["events"][c]
                    if c == 0:
                        event_an["nickname"] = "Miss Event 0"
                        # add some array annotations
                        num_ev = self.event_count(block_index, seg_index, c)
                        event_an["__array_annotations__"]["button"] = ["A"] * num_ev
                    elif c == 1:
                        event_an["nickname"] = "MrEpoch 1"

    def _segment_t_start(self, block_index: int, seg_index: int):
        # this must return a float scaled in seconds
        # this t_start will be shared by all objects in the segment
        # except AnalogSignal
        all_starts = [[0.0, 15.0], [0.0, 20.0, 60.0]]
        return all_starts[block_index][seg_index]

    def _segment_t_stop(self, block_index: int, seg_index: int):
        # this must return a float scaled in seconds
        all_stops = [[10.0, 25.0], [10.0, 30.0, 70.0]]
        return all_stops[block_index][seg_index]

    def _get_signal_size(self, block_index: int, seg_index: int, stream_index: int):
        # We generate fake data in which the two stream signals have the same shape
        # across all segments (10.0 seconds)
        # This is not the case for real data, instead you should return the signal
        # size depending on the block_index and segment_index
        # this must return an int = the number of samples

        # Note that channel_indexes can be ignored for most cases
        # except for the case of several sampling rates.
        return 100000

    def _get_signal_t_start(self, block_index: int, seg_index: int, stream_index: int):
        # This give the t_start of a signal.
        # Very often this is equal to _segment_t_start but not
        # always.
        # this must return a float scaled in seconds

        # Note that channel_indexes can be ignored for most cases
        # except for the case of several sampling rates.

        # Here this is the same.
        # this is not always the case
        return self._segment_t_start(block_index, seg_index)

    def _get_analogsignal_chunk(
        self,
        block_index: int,
        seg_index: int,
        i_start: int | None,
        i_stop: int | None,
        stream_index: int,
        channel_indexes: np.ndarray | list | slice | None,
    ):
        # this must return a signal chunk in a signal stream
        # limited with i_start/i_stop (can be None)
        # channel_indexes can be None (=all channel in the stream) or a list or numpy.array
        # This must return a numpy array 2D (even with one channel).
        # This must return the original dtype. No conversion here.
        # This must be as fast as possible.
        # To speed up this call all preparatory calculations should be implemented
        # in _parse_header().

        # Here we are lucky:  our signals are always zeros!!
        # it is not always the case :)
        # internally signals are int16
        # conversion to real units is done with self.header['signal_channels']

        if i_start is None:
            i_start = 0
        if i_stop is None:
            i_stop = 100000

        if i_start < 0 or i_stop > 100000:
            # some checks
            raise IndexError("I don't like your jokes")

        if channel_indexes is None:
            nb_chan = 8
        elif isinstance(channel_indexes, slice):
            channel_indexes = np.arange(8, dtype="int")[channel_indexes]
            nb_chan = len(channel_indexes)
        else:
            channel_indexes = np.asarray(channel_indexes)
            if any(channel_indexes < 0):
                raise IndexError("bad boy")
            if any(channel_indexes >= 8):
                raise IndexError("big bad wolf")
            nb_chan = len(channel_indexes)

        raw_signals = np.zeros((i_stop - i_start, nb_chan), dtype="int16")
        return raw_signals

    def _spike_count(self, block_index: int, seg_index: int, spike_channel_index: int):
        # Must return the nb of spikes for given (block_index, seg_index, spike_channel_index)
        # we are lucky:  our units have all the same nb of spikes!!
        # it is not always the case
        nb_spikes = 20
        return nb_spikes

    def _get_spike_timestamps(
        self, block_index: int, seg_index: int, spike_channel_index: int, t_start: float | None, t_stop: float | None
    ):
        # In our IO, timestamp are internally coded 'int64' and they
        # represent the index of the signals 10kHz
        # we are lucky: spikes have the same discharge in all segments!!
        # incredible neuron!! This is not always the case

        # the same clip t_start/t_start must be used in _spike_raw_waveforms()

        ts_start = self._segment_t_start(block_index, seg_index) * 10000

        spike_timestamps = np.arange(0, 10000, 500) + ts_start

        if t_start is not None or t_stop is not None:
            # restrict spikes to given limits (in seconds)
            lim0 = int(t_start * 10000)
            lim1 = int(t_stop * 10000)
            mask = (spike_timestamps >= lim0) & (spike_timestamps <= lim1)
            spike_timestamps = spike_timestamps[mask]

        return spike_timestamps

    def _rescale_spike_timestamp(self, spike_timestamps: np.ndarray, dtype: np.dtype):
        # must rescale to seconds, a particular spike_timestamps
        # with a fixed dtype so the user can choose the precision they want.
        spike_times = spike_timestamps.astype(dtype)
        spike_times /= 10000.0  # because 10kHz
        return spike_times

    def _get_spike_raw_waveforms(
        self, block_index: int, seg_index: int, spike_channel_index: int, t_start: float | None, t_stop: float | None
    ):
        # this must return a 3D numpy array (nb_spike, nb_channel, nb_sample)
        # in the original dtype
        # this must be as fast as possible.
        # the same clip t_start/t_start must be used in _spike_timestamps()

        # If there there is no waveform supported in the
        # IO them _spike_raw_waveforms must return None

        # In our IO waveforms come from all channels
        # they are int16
        # conversion to real units is done with self.header['spike_channels']
        # Here, we have a realistic case: all waveforms are only noise.
        # it is not always the case
        # we get 20 spikes with a sweep of 50 (5ms)

        # trick to get how many spike in the slice
        ts = self._get_spike_timestamps(block_index, seg_index, spike_channel_index, t_start, t_stop)
        nb_spike = ts.size

        np.random.seed(2205)  # a magic number (my birthday)
        waveforms = np.random.randint(low=-(2**4), high=2**4, size=nb_spike * 50, dtype="int16")
        waveforms = waveforms.reshape(nb_spike, 1, 50)
        return waveforms

    def _event_count(self, block_index: int, seg_index: int, event_channel_index: int):
        # event and spike are very similar
        # we have 2 event channels
        if event_channel_index == 0:
            # event channel
            return 6
        elif event_channel_index == 1:
            # epoch channel
            return 10

    def _get_event_timestamps(
        self, block_index: int, seg_index: int, event_channel_index: int, t_start: float | None, t_stop: float | None
    ):
        # the main difference between spike channel and event channel
        # is that for event channels we have 3D numpy array (timestamp, durations, labels) where
        # durations must be None for 'event'
        # label must a dtype ='U'

        # in our IO events are directly coded in seconds
        seg_t_start = self._segment_t_start(block_index, seg_index)
        if event_channel_index == 0:
            timestamp = np.arange(0, 6, dtype="float64") + seg_t_start
            durations = None
            labels = np.array(["trigger_a", "trigger_b"] * 3, dtype="U12")
        elif event_channel_index == 1:
            timestamp = np.arange(0, 10, dtype="float64") + 0.5 + seg_t_start
            durations = np.ones((10), dtype="float64") * 0.25
            labels = np.array(["zoneX"] * 5 + ["zoneZ"] * 5, dtype="U12")

        if t_start is not None:
            keep = timestamp >= t_start
            timestamp, labels = timestamp[keep], labels[keep]
            if durations is not None:
                durations = durations[keep]

        if t_stop is not None:
            keep = timestamp <= t_stop
            timestamp, labels = timestamp[keep], labels[keep]
            if durations is not None:
                durations = durations[keep]

        return timestamp, durations, labels

    def _rescale_event_timestamp(self, event_timestamps: np.ndarray, dtype: np.dtype, event_channel_index: int):
        # must rescale to seconds for a particular event_timestamps
        # with a fixed dtype so the user can choose the precision they want.

        # really easy here because in our case it is already in seconds
        event_times = event_timestamps.astype(dtype)
        return event_times

    def _rescale_epoch_duration(self, raw_duration: np.ndarray, dtype: np.dtype, event_channel_index: int):
        # really easy here because in our case it is already in seconds
        durations = raw_duration.astype(dtype)
        return durations
"""
neo.io has been split into a 2-level API:
  * neo.io: this API gives Neo objects
  * neo.rawio: this API gives raw data as they are in files.

Developers are encourage to use neo.rawio.

When this is done the neo.io can be implemented trivially
using code like shown in this file.

Author: sgarcia

"""

from neo.io.basefromrawio import BaseFromRaw
from neo.rawio.examplerawio import ExampleRawIO


class ExampleIO(ExampleRawIO, BaseFromRaw):
    name = "example IO"
    description = "Fake IO"

    # This is an inportant choice when there are several channels.
    #   'split-all' :  1 AnalogSignal each 1 channel
    #   'group-by-same-units' : one 2D AnalogSignal for each group of channel with same units
    _prefered_signal_group_mode = "group-by-same-units"

    def __init__(self, filename=""):
        ExampleRawIO.__init__(self, filename=filename)
        BaseFromRaw.__init__(self, filename)