IO developers’ guide¶
Guidelines for IO implementation¶
- Receipe to develop an IO module for a new data format:
- Fully understand the object model. See Neo core. If in doubt ask the mailing list.
- Fully understand
neo.io.exampleio
, It is a fake IO to explain the API. If in doubt ask the list. - Copy/paste
exampleio.py
and choose clear file and class names for your IO. - Decide which supported objects and readable objects your IO will deal with. This is the crucial point.
- Implement all methods
read_XXX()
related to readable objects. - Optional: If your IO supports reading multiple blocks from one file, implement a
read_all_blocks()
method. - Do not forget all lazy and cascade combinations.
- Optional: Support loading lazy objects by implementing a
load_lazy_object()
method and / or lazy cascading by implementing aload_lazy_cascade()
method. - Write good docstrings. List dependencies, including minimum version numbers.
- Add your class to
neo.io.__init__
. Keep the import inside try/except for dependency reasons. - Contact the Neo maintainers to put sample files for testing on the G-Node server (write access is not public).
- Write tests in
neo/test/io/test_xxxxxio.py
. You must at least pass the standard tests (inherited fromBaseTestIO
). - Commit or send a patch only if all tests pass.
Miscellaneous¶
- If your IO supports several version of a format (like ABF1, ABF2), upload to G-node test file repository all file version possible. (for utest coverage).
neo.core.Block.create_many_to_one_relationship()
offers a utility to complete the hierachy when all one-to-many relationships have been created.neo.io.tools.populate_RecordingChannel()
offers a utility to create inside aBlock
allRecordingChannel
objects and links toAnalogSignal
,SpikeTrain
, ...- In the docstring, explain where you obtained the file format specification if it is a closed one.
- If your IO is based on a database mapper, keep in mind that the returned object MUST be detached, because this object can be written to another url for copying.
Advanced lazy loading¶
If your IO supports a format that might take a long time to load or require lots of memory, consider implementing one or both of the following methods to enable advanced lazy loading:
load_lazy_object(self, obj)
: This method takes a lazily loaded object and returns the corresponding fully loaded object. It does not set any links of the newly loaded object (e.g. the segment attribute of a SpikeTrain). The information needed to fully load the lazy object should usually be stored in the IO object (e.g. in a dictionary with lazily loaded objects as keys and the address in the file as values).load_lazy_cascade(self, address, lazy)
: This method takes two parameters: The information required by your IO to load an object and a boolean that indicates if data objects should be lazy loaded (in the same way as with regularread_XXX()
methods). The method should return a loaded objects, including all the links for one-to-many and many-to-many relationships (lists of links should be replaced byLazyList
objects, see below).To implement lazy cascading, your read methods need to react when a user calls them with the
cascade
parameter set tolazy
. In this case, you have to replace all the link lists of your loaded objects with instances ofneo.io.tools.LazyList
. Instead of the actual objects that your IO would load at this point, fill the list with items thatload_lazy_cascade
needs to load the object.Because the links of objects can point to previously loaded objects, you need to cache all loaded objects in the IO. If
load_lazy_cascade()
is called with the address of a previously loaded object, return the object instead of loading it again. Also, a call toload_lazy_cascade()
might require you to load additional objects further up in the hierarchy. For example, if aSpikeTrain
is accessed through aSegment
, itsUnit
and theRecordingChannelGroup
of theUnit
might have to be loaded at that point as well if they have not been accessed before.Note that you are free to restrict lazy cascading to certain objects. For example, you could use the
LazyList
only for theanalogsignals
property ofSegment
andRecordingChannel
objects and load the rest of file immediately.
Tests¶
neo.test.io.commun_io_test.BaseTestIO
provide standard tests.
To use these you need to upload some sample data files at the G-Node portal. They will be publicly accessible for testing Neo.
These tests:
- check the compliance with the schema: hierachy, attribute types, ...
- check if the IO respects the lazy and cascade keywords.
- For IO able to both write and read data, it compares a generated dataset with the same data after a write/read cycle.
The test scripts download all files from the G-Node portal and store them locally in neo/test/io/files_for_tests/
.
Subsequent test runs use the previously downloaded files, rather than trying to download them each time.
Here is an example test script taken from the distribution: test_axonio.py
:
# -*- coding: utf-8 -*-
"""
Tests of neo.io.axonio
"""
# needed for python 3 compatibility
from __future__ import absolute_import
import sys
try:
import unittest2 as unittest
except ImportError:
import unittest
from neo.io import AxonIO
from neo.test.iotest.common_io_test import BaseTestIO
class TestAxonIO(BaseTestIO, unittest.TestCase):
files_to_test = ['File_axon_1.abf',
'File_axon_2.abf',
'File_axon_3.abf',
'File_axon_4.abf',
'File_axon_5.abf',
'File_axon_6.abf',
]
files_to_download = files_to_test
ioclass = AxonIO
if __name__ == "__main__":
unittest.main()
Logging¶
All IO classes by default have logging using the standard logging
module: already set up.
The logger name is the same as the full qualified class name, e.g. neo.io.hdf5io.NeoHdf5IO
.
The class.logger
attribute holds the logger for easy access.
There are generally 3 types of situations in which an IO class should use a logger
- Recoverable errors with the file that the users need to be notified about. In this case, please use
logger.warning()
orlogger.error()
. If there is an exception associated with the issue, you can uselogger.exception()
in the exception handler to automatically include a backtrace with the log. By default, all users will see messages at this level, so please restrict it only to problems the user absolutely needs to know about.- Informational messages that advanced users might want to see in order to get some insight into the file. In this case, please use
logger.info()
.- Messages useful to developers to fix problems with the io class. In this case, please use
logger.debug()
.
A log handler is automatically added to neo
, so please do not user your own handler.
Please use the class.logger
attribute for accessing the logger inside the class rather than logging.getLogger()
.
Please do not log directly to the root logger (e.g. logging.warning()
), use the class’s logger instead (class.logger.warning()
).
In the tests for the io class, if you intentionally test broken files, please disable logs by setting the logging level to 100.
ExampleIO¶
-
class
neo.io.
ExampleIO
(filename=None)¶ Class for “reading” fake data from an imaginary file.
For the user, it generates a
Segment
or aBlock
with a sinusoidalAnalogSignal
, aSpikeTrain
and anEventArray
.For a developer, it is just an example showing guidelines for someone who wants to develop a new IO module.
- Two rules for developers:
- Respect the Neo IO API (Details of API)
- Follow Guidelines for IO implementation
- Usage:
>>> from neo import io >>> r = io.ExampleIO(filename='itisafake.nof') >>> seg = r.read_segment(lazy=False, cascade=True) >>> print(seg.analogsignals) [<AnalogSignal(array([ 0.19151945, 0.62399373, 0.44149764, ..., 0.96678374, ... >>> print(seg.spiketrains) [<SpikeTrain(array([ -0.83799524, 6.24017951, 7.76366686, 4.45573701, 12.60644415, 10.68328994, 8.07765735, 4.89967804, ... >>> print(seg.eventarrays) [<EventArray: TriggerB@9.6976 s, TriggerA@10.2612 s, TriggerB@2.2777 s, TriggerA@6.8607 s, ... >>> anasig = r.read_analogsignal(lazy=True, cascade=False) >>> print(anasig._data_description) {'shape': (150000,)} >>> anasig = r.read_analogsignal(lazy=False, cascade=False)
Here is the entire file:
# -*- coding: utf-8 -*-
"""
Class for "reading" fake data from an imaginary file.
For the user, it generates a :class:`Segment` or a :class:`Block` with a
sinusoidal :class:`AnalogSignal`, a :class:`SpikeTrain` and an
:class:`EventArray`.
For a developer, it is just an example showing guidelines for someone who wants
to develop a new IO module.
Depends on: scipy
Supported: Read
Author: sgarcia
"""
# needed for python 3 compatibility
from __future__ import absolute_import
# note neo.core needs only numpy and quantities
import numpy as np
import quantities as pq
# but my specific IO can depend on many other packages
try:
from scipy import stats
except ImportError as err:
HAVE_SCIPY = False
SCIPY_ERR = err
else:
HAVE_SCIPY = True
SCIPY_ERR = None
# I need to subclass BaseIO
from neo.io.baseio import BaseIO
# to import from core
from neo.core import Segment, AnalogSignal, SpikeTrain, EventArray
# I need to subclass BaseIO
class ExampleIO(BaseIO):
"""
Class for "reading" fake data from an imaginary file.
For the user, it generates a :class:`Segment` or a :class:`Block` with a
sinusoidal :class:`AnalogSignal`, a :class:`SpikeTrain` and an
:class:`EventArray`.
For a developer, it is just an example showing guidelines for someone who wants
to develop a new IO module.
Two rules for developers:
* Respect the Neo IO API (:ref:`neo_io_API`)
* Follow :ref:`io_guiline`
Usage:
>>> from neo import io
>>> r = io.ExampleIO(filename='itisafake.nof')
>>> seg = r.read_segment(lazy=False, cascade=True)
>>> print(seg.analogsignals) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
[<AnalogSignal(array([ 0.19151945, 0.62399373, 0.44149764, ..., 0.96678374,
...
>>> print(seg.spiketrains) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
[<SpikeTrain(array([ -0.83799524, 6.24017951, 7.76366686, 4.45573701,
12.60644415, 10.68328994, 8.07765735, 4.89967804,
...
>>> print(seg.eventarrays) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
[<EventArray: TriggerB@9.6976 s, TriggerA@10.2612 s, TriggerB@2.2777 s, TriggerA@6.8607 s, ...
>>> anasig = r.read_analogsignal(lazy=True, cascade=False)
>>> print(anasig._data_description)
{'shape': (150000,)}
>>> anasig = r.read_analogsignal(lazy=False, cascade=False)
"""
is_readable = True # This class can only read data
is_writable = False # write is not supported
# This class is able to directly or indirectly handle the following objects
# You can notice that this greatly simplifies the full Neo object hierarchy
supported_objects = [ Segment , AnalogSignal, SpikeTrain, EventArray ]
# This class can return either a Block or a Segment
# The first one is the default ( self.read )
# These lists should go from highest object to lowest object because
# common_io_test assumes it.
readable_objects = [ Segment , AnalogSignal, SpikeTrain ]
# This class is not able to write objects
writeable_objects = [ ]
has_header = False
is_streameable = False
# This is for GUI stuff : a definition for parameters when reading.
# This dict should be keyed by object (`Block`). Each entry is a list
# of tuple. The first entry in each tuple is the parameter name. The
# second entry is a dict with keys 'value' (for default value),
# and 'label' (for a descriptive name).
# Note that if the highest-level object requires parameters,
# common_io_test will be skipped.
read_params = {
Segment : [
('segment_duration',
{'value' : 15., 'label' : 'Segment size (s.)'}),
('num_analogsignal',
{'value' : 8, 'label' : 'Number of recording points'}),
('num_spiketrain_by_channel',
{'value' : 3, 'label' : 'Num of spiketrains'}),
],
}
# do not supported write so no GUI stuff
write_params = None
name = 'example'
extensions = [ 'nof' ]
# mode can be 'file' or 'dir' or 'fake' or 'database'
# the main case is 'file' but some reader are base on a directory or a database
# this info is for GUI stuff also
mode = 'fake'
def __init__(self , filename = None) :
"""
Arguments:
filename : the filename
Note:
- filename is here just for exampe because it will not be take in account
- if mode=='dir' the argument should be dirname (See TdtIO)
"""
BaseIO.__init__(self)
self.filename = filename
# Seed so all instances can return the same values
np.random.seed(1234)
# Segment reading is supported so I define this :
def read_segment(self,
# the 2 first keyword arguments are imposed by neo.io API
lazy = False,
cascade = True,
# all following arguments are decied by this IO and are free
segment_duration = 15.,
num_analogsignal = 4,
num_spiketrain_by_channel = 3,
):
"""
Return a fake Segment.
The self.filename does not matter.
In this IO read by default a Segment.
This is just a example to be adapted to each ClassIO.
In this case these 3 paramters are taken in account because this function
return a generated segment with fake AnalogSignal and fake SpikeTrain.
Parameters:
segment_duration :is the size in secend of the segment.
num_analogsignal : number of AnalogSignal in this segment
num_spiketrain : number of SpikeTrain in this segment
"""
sampling_rate = 10000. #Hz
t_start = -1.
#time vector for generated signal
timevect = np.arange(t_start, t_start+ segment_duration , 1./sampling_rate)
# create an empty segment
seg = Segment( name = 'it is a seg from exampleio')
if cascade:
# read nested analosignal
for i in range(num_analogsignal):
ana = self.read_analogsignal( lazy = lazy , cascade = cascade ,
channel_index = i ,segment_duration = segment_duration, t_start = t_start)
seg.analogsignals += [ ana ]
# read nested spiketrain
for i in range(num_analogsignal):
for _ in range(num_spiketrain_by_channel):
sptr = self.read_spiketrain(lazy = lazy , cascade = cascade ,
segment_duration = segment_duration, t_start = t_start , channel_index = i)
seg.spiketrains += [ sptr ]
# create an EventArray that mimic triggers.
# note that ExampleIO do not allow to acess directly to EventArray
# for that you need read_segment(cascade = True)
eva = EventArray()
if lazy:
# in lazy case no data are readed
# eva is empty
pass
else:
# otherwise it really contain data
n = 1000
# neo.io support quantities my vector use second for unit
eva.times = timevect[(np.random.rand(n)*timevect.size).astype('i')]* pq.s
# all duration are the same
eva.durations = np.ones(n)*500*pq.ms
# label
l = [ ]
for i in range(n):
if np.random.rand()>.6: l.append( 'TriggerA' )
else : l.append( 'TriggerB' )
eva.labels = np.array( l )
seg.eventarrays += [ eva ]
seg.create_many_to_one_relationship()
return seg
def read_analogsignal(self ,
# the 2 first key arguments are imposed by neo.io API
lazy = False,
cascade = True,
channel_index = 0,
segment_duration = 15.,
t_start = -1,
):
"""
With this IO AnalogSignal can e acces directly with its channel number
"""
sr = 10000.
sinus_freq = 3. # Hz
#time vector for generated signal:
tvect = np.arange(t_start, t_start+ segment_duration , 1./sr)
if lazy:
anasig = AnalogSignal([], units='V', sampling_rate=sr * pq.Hz,
t_start=t_start * pq.s,
channel_index=channel_index)
# we add the attribute lazy_shape with the size if loaded
anasig.lazy_shape = tvect.shape
else:
# create analogsignal (sinus of 3 Hz)
sig = np.sin(2*np.pi*tvect*sinus_freq + channel_index/5.*2*np.pi)+np.random.rand(tvect.size)
anasig = AnalogSignal(sig, units= 'V', sampling_rate=sr * pq.Hz,
t_start=t_start * pq.s,
channel_index=channel_index)
# for attributes out of neo you can annotate
anasig.annotate(info = 'it is a sinus of %f Hz' %sinus_freq )
return anasig
def read_spiketrain(self ,
# the 2 first key arguments are imposed by neo.io API
lazy = False,
cascade = True,
segment_duration = 15.,
t_start = -1,
channel_index = 0,
):
"""
With this IO SpikeTrain can e acces directly with its channel number
"""
# There are 2 possibles behaviour for a SpikeTrain
# holding many Spike instance or directly holding spike times
# we choose here the first :
if not HAVE_SCIPY:
raise SCIPY_ERR
num_spike_by_spiketrain = 40
sr = 10000.
if lazy:
times = [ ]
else:
times = (np.random.rand(num_spike_by_spiketrain)*segment_duration +
t_start)
# create a spiketrain
spiketr = SpikeTrain(times, t_start = t_start*pq.s, t_stop = (t_start+segment_duration)*pq.s ,
units = pq.s,
name = 'it is a spiketrain from exampleio',
)
if lazy:
# we add the attribute lazy_shape with the size if loaded
spiketr.lazy_shape = (num_spike_by_spiketrain,)
# ours spiketrains also hold the waveforms:
# 1 generate a fake spike shape (2d array if trodness >1)
w1 = -stats.nct.pdf(np.arange(11,60,4), 5,20)[::-1]/3.
w2 = stats.nct.pdf(np.arange(11,60,2), 5,20)
w = np.r_[ w1 , w2 ]
w = -w/max(w)
if not lazy:
# in the neo API the waveforms attr is 3 D in case tetrode
# in our case it is mono electrode so dim 1 is size 1
waveforms = np.tile( w[np.newaxis,np.newaxis,:], ( num_spike_by_spiketrain ,1, 1) )
waveforms *= np.random.randn(*waveforms.shape)/6+1
spiketr.waveforms = waveforms*pq.mV
spiketr.sampling_rate = sr * pq.Hz
spiketr.left_sweep = 1.5* pq.s
# for attributes out of neo you can annotate
spiketr.annotate(channel_index = channel_index)
return spiketr