Skip to content

Instantly share code, notes, and snippets.

@wmvanvliet
Last active July 16, 2024 07:20
Show Gist options
  • Save wmvanvliet/b3670aa1339304b7e21059cef8b4c3a3 to your computer and use it in GitHub Desktop.
Save wmvanvliet/b3670aa1339304b7e21059cef8b4c3a3 to your computer and use it in GitHub Desktop.
Python code for interfacing with the hardware in the MEG core at Aalto University.
"""Interface between python and the MEG hardware at Aalto's MEG core.
This is intended to be used in combination with PsychoPy to perform classic experiments
involving simple events and responses.
Example that presents a visual stimulus, sends a trigger code to the MEG and waits for a
button response with one of the response pads:
>>> from psychopy import core, visual
>>> from aalto_meg import AaltoMEG
>>> meg = AaltoMeg()
>>> window = visual.Window(size=(800, 600), fullscr=False, color="#696969")
>>> stim = visual.TextStim(window, color="black", text="HELLO", units="pix", height=40)
>>> stim.draw()
>>> window.flip()
>>> meg.send_trigger_code(1)
>>> response = meg.wait_for_button_press(timeout=10)
>>> print("Button response:", response)
>>> window.close()
>>> code.quit()
In a scenario like the example above, the delay between the trigger code being received
and the ProPIXX projector actually drawing the stimulus on the screen is roughly 10 ms.
Authors: Marijn van Vliet ([email protected])
Laura Rautiainen ([email protected])
"""
import random
import nidaqmx
from nidaqmx.constants import AcquisitionType, LineGrouping
from nidaqmx.errors import DaqError
from psychopy import core, parallel
# These are specific to the stimulus-pc in the MEG lab
LPT1_ADDRESS = 0x6FF8 # address of LPT1 port
LEFT_BUTTON_LINE = 25 # left response pad is on line 25 (zero indexed)
RIGHT_BUTTON_LINE = 26 # right response pad in on line 26 (zero indexed)
class AaltoMEG:
"""Interface to the MEG hardware at Aalto's MEG core.
Parameters
----------
fake : bool
When this is set to ``True``, don't connect to any hardware but "fake" all the
responses. This is useful for testing things on your own machine.
"""
def __init__(self, fake=False):
self.fake = fake
if fake:
# Don't connect to actual hardware
return
self.parallel_port = parallel.ParallelPort(address=LPT1_ADDRESS)
self.parallel_port.setData(0)
def send_trigger_code(self, code):
"""Send a trigger code to the MEG status channel.
The trigger code ends up in the STI101 channel, which will jump from 0 to the
desired code, stay there for 3 ms (about 3 samples) and then jump back to 0.
For this reason, sending a trigger code of `0` is not possible. Also, the
maximum possible trigger code is 127.
Parameters
----------
code : int
Trigger code to send. The code needs to be in the range 1-127.
"""
if not (1 <= code <= 127):
raise ValueError("Trigger codes need to be in the range 1-127.")
if self.fake:
# Don't actually send anything, but pretend we do.
core.wait(0.003)
return
self.parallel_port.setData(code)
core.wait(0.003)
self.parallel_port.setData(0)
def wait_for_button_press(self, timeout=None):
"""Wait until the participant has pressed one of the response buttons.
Parameters
----------
timeout : float
The number of seconds to wait for a response. When this time has elapsed,
this function returns ``None`` indicating no response was received. You can
specify ``timeout=None`` to wait indefinitely.
Returns
-------
response : str | None
"left": Left button
"right": Right button
None : No response
"""
if self.fake:
# Fake a random button press.
core.wait(0.5 + 0.2 * random.rand())
return random.choice(["left", "right"])
di_lines = f"Dev1/port0/line{LEFT_BUTTON_LINE}:{RIGHT_BUTTON_LINE}"
with nidaqmx.Task() as task:
# Ask the DAQ to detect on-flanks
task.di_channels.add_di_chan(
di_lines, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES
)
task.timing.cfg_change_detection_timing(
rising_edge_chan=di_lines,
sample_mode=AcquisitionType.FINITE,
num_samples=2, # we read two samples to cover the up-flank
)
# Until we reach timeout, wait for the next up-flank
clock = core.Clock()
while timeout is None or timeout > 0:
if timeout is not None:
timeout = max(timeout - clock.getTime(), 0)
try:
button = task.read(timeout=timeout)
if button & (1 << LEFT_BUTTON_LINE):
return "left"
elif button & (1 << RIGHT_BUTTON_LINE):
return "right"
except DaqError as e:
if e.error_code == -200284: # timeout reached
return None
else:
raise e
# We should probably have returned from the function in the while-loop above,
# but in the off-chance we didn't, we have reached timeout.
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment