Last active
August 2, 2017 19:51
-
-
Save haikuginger/e7ef2756e0a81d23eafffbb2c64795ce to your computer and use it in GitHub Desktop.
Tape drive
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from copy import copy | |
from time import sleep | |
class OutOfSpace(Exception): | |
pass | |
class TapeNotFound(Exception): | |
pass | |
class Tape(object): | |
max_size = 100 | |
def __init__(self, tape_id): | |
self.tape_id = tape_id | |
self.storage = bytearray([0x00 for i in range(100)]) | |
def write(self, position, data): | |
if len(data) + position > self.max_size: | |
raise Exception | |
print('Writing data', end='') | |
for i, byte in enumerate(data): | |
delay('.', 0.05, newline=False) | |
self.storage[position + i] = byte | |
print('done.') | |
return len(data) | |
def read(self, position, length): | |
if len(data) + position > self.max_size: | |
raise Exception | |
output = bytearray() | |
print('Reading data', end='') | |
for i in range(length): | |
delay('.', 0.05, newline=False) | |
output.append(self.storage[position + i]) | |
print('done.') | |
return output | |
class TapeStorage(object): | |
size = 24 | |
def __init__(self): | |
self.tapes = {} | |
def insert_tape(self, tape): | |
if len(self.tapes) >= self.size: | |
raise OutOfSpace('No room for additional tapes') | |
delay('Returning tape {} to storage...'.format(tape.tape_id), 1) | |
self.tapes[tape.tape_id] = tape | |
def retrieve_tape(self, tape_id): | |
if tape_id in self.tapes or len(self.tapes) < self.size: | |
delay('Retrieving tape {} from storage...'.format(tape_id), 1) | |
return self.tapes.get(tape_id, Tape(tape_id)) | |
raise Exception | |
class TapeLibrary(object): | |
def __init__(self): | |
self.storage = TapeStorage() | |
self.tape = None | |
self.position = 0 | |
def read_data(self, tape_id, position, data_length): | |
self.load_tape(tape_id) | |
self.seek(position) | |
data = self.tape.read(position, data_length) | |
self.position = position + data_length | |
return data | |
def write_data(self, tape_id, position, data): | |
self.load_tape(tape_id) | |
self.seek(position) | |
self.tape.write(position, data) | |
self.position = position + len(data) | |
def load_tape(self, tape_id): | |
if self.tape and self.tape.tape_id == tape_id: | |
return | |
self.store_tape() | |
tape = self.storage.retrieve_tape(tape_id) | |
self.insert(tape) | |
def store_tape(self): | |
current_tape = self.eject() | |
if current_tape: | |
self.storage.insert_tape(current_tape) | |
def eject(self): | |
if not self.tape: | |
return | |
self.rewind() | |
tape = self.tape | |
delay('Ejecting tape {}...'.format(tape.tape_id), 1) | |
self.tape = None | |
return tape | |
def insert(self, tape): | |
delay('Loading tape {}...'.format(tape.tape_id), 1) | |
self.tape = tape | |
def seek(self, position): | |
if not 0 <= position < self.tape.max_size: | |
raise Exception | |
difference = abs(position - self.position) | |
if difference: | |
print('Seeking to {}'.format(position), end='') | |
seek_speed = 2.0/self.tape.max_size | |
for i in range(difference): | |
delay('.', seek_speed, newline=False) | |
self.position = position | |
print('done.') | |
def rewind(self): | |
self.seek(0) | |
class TapeLocation(object): | |
def __init__(self, tape, position, size): | |
self.tape_id = tape | |
self.tape_position = position | |
self.size = size | |
def __copy__(self): | |
return TapeLocation(self.tape_id, self.tape_position, self.size) | |
class File(object): | |
def __init__(self): | |
self.locations = [] | |
def get_read_operations(self): | |
for loc in self.locations: | |
yield (loc.tape_id, loc.tape_position, loc.size) | |
class StorageManager(object): | |
def __init__(self): | |
self.library = TapeLibrary() | |
self.files = {} | |
self.next_location = TapeLocation(0, 0, 0) | |
def store_file(self, filename, data): | |
location = copy(self.next_location) | |
file = File() | |
while data: | |
self.library.load_tape(location.tape_id) | |
bytes_to_write = self.library.tape.max_size - location.tape_position | |
bytes_to_write = min(bytes_to_write, len(data)) | |
self.library.write_data(location.tape_id, location.tape_position, data[:bytes_to_write]) | |
final_loc = copy(location) | |
final_loc.size = bytes_to_write | |
file.locations.append(final_loc) | |
data = data[bytes_to_write:] | |
if data: | |
location.tape_id += 1 | |
location.tape_position = 0 | |
else: | |
location.tape_position += bytes_to_write | |
self.files[filename] = file | |
self.next_location = location | |
def retrieve_file(self, filename): | |
if filename not in self.files: | |
raise Exception | |
file = self.files[filename] | |
data = bytearray() | |
for op in file.get_read_operations(): | |
data += self.library.read_data(*op) | |
return data | |
def delay(reason, duration, newline=True): | |
print(reason, end='\n' if newline else '', flush=True) | |
sleep(duration) | |
y = StorageManager() | |
data = bytearray('I CAN STORE EMOJI π±ππ', 'utf-8') | |
otherdata = bytearray('this is a test of text data', 'ascii') | |
bigdata = bytearray('Four score and seven years ago our fathers brought forth on this continent, a new nation, conceived in Liberty, and dedicated to the proposition that all men are created equal.', 'ascii') | |
y.store_file('myfile', data) | |
y.store_file('otherfile', otherdata) | |
y.store_file('gettysburg', bigdata) | |
print(y.retrieve_file('myfile').decode('utf-8')) | |
print(y.retrieve_file('gettysburg').decode('ascii')) | |
print(y.retrieve_file('otherfile').decode('ascii')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment