Created
June 5, 2025 21:01
-
-
Save rarensu/e46fc09936898cc29236563c67eb51cf to your computer and use it in GitHub Desktop.
A tiny demo of what subtasks might be like for agentic AI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# %% | |
import numpy | |
import time | |
verbose = False | |
class context: | |
def __init__(self, kind=None): | |
self.kind = kind | |
def __str__(self): | |
if verbose: | |
return f"<context kind={self.kind}>\n</context>\n" | |
else: | |
return "" | |
def copy(self): | |
return context(self.kind) | |
class message(context): | |
def __init__(self, text): | |
super().__init__("message") | |
self.text = text | |
def __str__(self): | |
if verbose: | |
return f"<context kind={self.kind}>\n{self.text}\n</context>\n" | |
else: | |
return f"{self.text}\n" | |
def copy(self): | |
return message(self.text) | |
class file(context): | |
def __init__(self, name): | |
super().__init__("file") | |
self.name = name | |
def __str__(self): | |
with open(self.name, 'r') as f: | |
data = f.read() | |
if verbose: | |
return f"<context kind={self.kind}>\n{data}\n</context>\n" | |
else: | |
return f"{data}\n" | |
def copy(self): | |
return file(self.name) | |
class log(context): | |
def __init__(self, text): | |
super().__init__("log") | |
self.text = text | |
self.timestamp = time.time() # Placeholder for timestamp, if needed | |
def __str__(self): | |
if verbose: | |
return f"<log timestamp={self.timestamp}>\n{self.text}\n</log>\n" | |
else: | |
return "" | |
def copy(self): | |
return log(self.text) | |
class stream: | |
def __init__(self, items=None, id=0): | |
if items is not None: | |
self.items = numpy.array(items, dtype=object) | |
else: | |
self.items = numpy.array([], dtype=object) | |
self.id = id | |
def __str__(self): | |
result = "" | |
if verbose: | |
result += f"<stream id={self.id}>\n" | |
for item in self.items: | |
result += str(item) | |
if verbose: | |
result += "</stream>\n" | |
return result | |
def copy(self): | |
return stream([item.copy() for item in self.items], self.id+1) | |
def append(self, item): | |
if isinstance(item, context): | |
self.items = numpy.append(self.items, item) | |
else: | |
raise TypeError("Item must be of type 'context'") | |
def extend(self, items): | |
if not items: | |
raise ValueError("Items cannot be empty") | |
if not all(isinstance(item, context) for item in items): | |
raise TypeError("All items must be of type 'context'") | |
self.items = numpy.extend(self.items, items) | |
# %% | |
# stream to file. | |
import os | |
multiple_streams = [] | |
this_stream = stream() | |
while True: | |
try: | |
text = input("Enter a message or command:") | |
except Exception as e: | |
text = f"An error occurred: {e}" | |
except KeyboardInterrupt: | |
print("\nExiting...") | |
break | |
except EOFError: | |
print("\nEnd of input detected. Exiting...") | |
break | |
this_stream.append(log(text)) | |
if text.strip() == "": | |
text2 = input("did you want to quit? send another empty line to confirm.") | |
if text2.strip() == "": | |
break | |
elif text.startswith("/"): | |
# Handle commands | |
command = text[1:].split()[0] | |
if command == "fork": | |
this_stream.append(log(f"Forked new stream with id={this_stream.id}.")) | |
this_stream.append(message(text)) | |
#items before forking | |
new_stream = this_stream.copy() | |
multiple_streams.append(this_stream) | |
this_stream = new_stream | |
elif command == "exit": | |
try: | |
old_id=this_stream.id | |
this_stream = multiple_streams.pop() | |
# items after returning | |
this_stream.append(log("Returned from stream id={old_id}.")) | |
this_stream.append(message(text)) | |
except IndexError: | |
break | |
elif command == "verbose": | |
verbose = not verbose | |
this_stream.append(log(f"Verbose mode set to {verbose}.")) | |
else: | |
this_stream.append(log(f"Unknown command: {command}")) | |
elif text.startswith("@"): | |
filename = text[1:].strip() | |
try: | |
if os.path.exists(filename): | |
with open(filename, 'r') as f: | |
_ = f.read() | |
except FileNotFoundError: | |
print(f"File '{filename}' not found.") | |
except Exception as e: | |
print(f"An error occurred while reading the file '{filename}': {e}") | |
else: | |
item = file(filename) | |
this_stream.append(item) | |
else: | |
item = message(text) | |
this_stream.append(item) | |
with open("stream.txt", "w") as f: | |
f.write(str(this_stream)) | |
del text | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment