Last active
January 15, 2023 05:49
-
-
Save attie/be46b006bd35818327458e0bdddaa13d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import asyncio | |
from aramanth import * | |
from ....support.endpoint import * | |
from ....gateware.pads import * | |
from ....gateware.pll import * | |
from ... import * | |
class VideoIkea2WireSubtarget(Elaboratable): | |
def __init__(self, pads, out_fifo): | |
self.d_pad = pads.d_t | |
self.out_fifo = out_fifo | |
def elaborate(self, platform): | |
sys_clk_freq = platform.default_clk_frequency | |
t_period = int(1 + sys_clk_freq * 22.6e-6) | |
t_one = int(1 + sys_clk_freq * 12.2e-6) | |
t_zero = int(1 + sys_clk_freq * 5.4e-6) | |
t_reset_period = int(1 + sys_clk_freq * 86.6e-6) | |
t_reset_lo = int(1 + sys_clk_freq * 26.6e-6) | |
m = Module() | |
d_out = Signal() | |
m.d.comb += [ | |
self.d_pad.oe.eq(d_out), | |
self.d_pad.o.eq(1), | |
d_out.eq(1), | |
] | |
buf_in = Signal(32) | |
buf_out = Signal(27) | |
cyc_ctr = Signal(range(t_reset_period+1)) | |
bit_ctr = Signal(range(buf_in.width+1)) | |
with m.FSM(): | |
with m.State('INIT'): | |
m.d.sync += bit_ctr.eq(0) | |
m.next = 'LOAD' | |
with m.State('LOAD'): | |
m.d.comb += self.out_fifo.r_en.eq(1) | |
with m.If(self.out_fifo.r_rdy): | |
m.d.sync += buf_in.eq(Cat(self.out_fifo.r_data, buf_in[:-8])), | |
with m.If(bit_ctr < buf_in.width - 8): | |
m.d.sync += bit_ctr.eq(bit_ctr + 8) | |
with m.Else(): | |
m.next = 'LATCH' | |
with m.State('LATCH'): | |
m.d.sync += [ | |
bit_ctr.eq(0), | |
cyc_ctr.eq(0), | |
buf_out.eq(Cat(buf_in[:3], buf_in[8:])), | |
] | |
m.next = 'RESET' | |
with m.State('RESET'): | |
with m.If(cyc_ctr < t_reset_lo): | |
m.d.comb += d_out.eq(0) | |
m.d.sync += cyc_ctr.eq(cyc_ctr + 1) | |
with m.Elif(cyc_ctr < t_reset_period): | |
m.d.sync += cyc_ctr.eq(cyc_ctr + 1) | |
with m.Else(): | |
m.d.sync += [ | |
bit_ctr.eq(0), | |
cyc_ctr.eq(0), | |
] | |
m.next = 'SEND' | |
with m.State('SEND'): | |
with m.If(cyc_ctr < t_zero): | |
m.d.comb += d_out.eq(0) | |
m.d.sync += cyc_ctr.eq(cyc_ctr + 1) | |
with m.Elif(cyc_ctr < t_one): | |
m.d.comb += d_out.eq(~buf_out[-1]) | |
m.d.sync += cyc_ctr.eq(cyc_ctr + 1) | |
with m.Elif(cyc_ctr < t_period): | |
m.d.sync += cyc_ctr.eq(cyc_ctr + 1) | |
with m.Elif(bit_ctr < buf_out.width-1): | |
m.d.sync += [ | |
bit_ctr.eq(bit_ctr + 1), | |
cyc_ctr.eq(0), | |
buf_out.eq(Cat(0, buf_out[:-1])), | |
] | |
with m.Else(): | |
m.next = 'INIT' | |
return m | |
class VideoIkea2WireApplet(GlasgowApplet, name='ikea-2wire'): | |
logger = logging.getLogger(__name__) | |
help = 'drive IKEA 2-wire LED chain, 205.325.78' | |
description = """ | |
Output RGB values to the whole chain of LEDs. | |
Connect the LED chain between the selected pin and 0v. | |
Add a 330 Ohm pull resistor (in parallel to the built-in 10 kOhm). | |
""" | |
@classmethod | |
def add_build_arguments(cls, parser, access): | |
super().add_build_arguments(parser, access) | |
access.add_pin_argument(parser, 'd', default=True) | |
@classmethod | |
def build(self, target, args): | |
self.mux_interface = iface = target.multiplexer.claim_interface(self, args) | |
subtarget = iface.add_subtarget(VideoIkea2WireSubtarget( | |
pads=iface.get_pads(args, pins=('d',)), | |
out_fifo=iface.get_out_fifo(), | |
)) | |
return subtarget | |
async def run(self, device, args): | |
return await device.demultiplexer.claim_interface(self, self.mux_interface, args, write_buffer_size=8, pull_high={args.pin_d}) | |
async def interact(self, device, args, chain): | |
while True: | |
# the format is: | |
# 8-bit : green | |
# 8-bit : red | |
# 8-bit : blue | |
# 3-bit : trailer (not required) | |
print('Green') | |
await chain.write([ 0x40, 0x00, 0x00, 0x00 ]) | |
await chain.flush() | |
await asyncio.sleep(0.25) | |
print('Red') | |
await chain.write([ 0x00, 0x40, 0x00, 0x00 ]) | |
await chain.flush() | |
await asyncio.sleep(0.25) | |
print('Blue') | |
await chain.write([ 0x00, 0x00, 0x40, 0x00 ]) | |
await chain.flush() | |
await asyncio.sleep(0.25) | |
print('Off') | |
await chain.write([ 0x00, 0x00, 0x00, 0x00 ]) | |
await chain.flush() | |
await asyncio.sleep(0.25) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment