Skip to content

Instantly share code, notes, and snippets.

@Antman261
Created February 24, 2025 01:04
Show Gist options
  • Save Antman261/3cc4e412de8b2d305c10497bf0786336 to your computer and use it in GitHub Desktop.
Save Antman261/3cc4e412de8b2d305c10497bf0786336 to your computer and use it in GitHub Desktop.
import json
from typing import Callable, TypedDict
from pathlib import Path
from tempfile import gettempdir
from talon import cron
Channel = TypedDict('Channel', {
'dispatch': Callable[[dict], None],
'handle_event': Callable[[dict], None],
'app_events': Path,
})
watch_job = None
channels: dict[str, Channel] = {}
def init_channel(name: str, handle_event: Callable[[dict], None] | None = None) -> Callable[[dict], None]:
global channels
if channels.get(name):
return channels.get(name)['dispatch']
return _register_channel(name=name, handle_event=handle_event)
def dispatch_event(channel: str, event: dict) -> None:
channels[channel]['dispatch'](event)
def _init_directory(name):
dirpath = _get_channel_dirpath(name)
talon_events = dirpath / "talon-events"
app_events = dirpath / "app-events"
talon_events.mkdir(mode=0o777, parents=True, exist_ok=True) # Fix me
app_events.mkdir(mode=0o777, parents=True, exist_ok=True) # Fix me
return (talon_events, app_events)
def _register_channel(name, handle_event):
global watch_job
(talon_events, app_events) = _init_directory(name)
event_number: int = 0
def dispatch(event):
nonlocal event_number
file = talon_events / f"e-{event_number}.json"
file.write_text(data=json.dumps(event))
file.chmod(0o777) # Fix me
event_number += 1
channels[name] = Channel(dispatch=dispatch,
app_events=app_events,
handle_event=handle_event)
if watch_job == None and handle_event != None:
watch_job = cron.interval("100ms", _process_events)
return dispatch
def _process_events():
for channel in channels.values():
files = list(channel['app_events'].iterdir())
files.sort()
for event_file in files:
event = json.load(event_file)
channel['handle_event'](event)
def _get_channel_dirpath(name: str):
return Path(gettempdir()) / name
import datetime
from talon import speech_system, actions
from .fsio_events import init_channel
dispatch_event = init_channel('talon-tray')
def on_phrase(phrase):
text = actions.user.history_transform_phrase_text(phrase.get("text"))
if text is not None:
dispatch_event({ "type": "PHRASE_UTTERED", "phrase": text, "occurredAt": datetime.datetime.now().timestamp() * 1000 })
speech_system.register("phrase", on_phrase)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment