Skip to content

Instantly share code, notes, and snippets.

@killerbees19
Forked from halcy/mastofuse.py
Created March 14, 2025 20:44
Show Gist options
  • Save killerbees19/33ef980ce405fb8296c4e14d706f21ba to your computer and use it in GitHub Desktop.
Save killerbees19/33ef980ce405fb8296c4e14d706f21ba to your computer and use it in GitHub Desktop.
import sys, os, stat, errno, operator, time, datetime, requests
from fuse import FUSE, Operations, LoggingMixIn, FuseOSError
from mastodon import Mastodon
from cachetools import TTLCache, cachedmethod
from mastodon.return_types import MediaAttachment, Account, Status
class PathItem:
def __init__(self, path_type, mtime=None, size=0, symlink_target=None, read_fn=None, listdir_fn=None):
self.path_type = path_type
self.mtime = mtime or time.time()
self.size = size
self.symlink_target = symlink_target
self._read_fn = read_fn
self._listdir_fn = listdir_fn
def read(self, offset, length):
if self.path_type != "file":
raise FuseOSError(errno.EISDIR)
if not self._read_fn:
return b""
data = self._read_fn()
return data[offset: offset + length]
def listdir(self):
if self.path_type != "dir":
raise FuseOSError(errno.ENOTDIR)
if not self._listdir_fn:
return []
return self._listdir_fn()
class MastoFS(LoggingMixIn, Operations):
def __init__(self, url, token):
self.api = Mastodon(
access_token=token,
api_base_url=url,
)
self.mastodon_object_cache = TTLCache(maxsize=100, ttl=5)
self.long_term_posts = TTLCache(maxsize=5000, ttl=86400)
self.long_term_accounts = TTLCache(maxsize=5000, ttl=86400)
self.write_buffers = {}
self.reblog_post = None
self.reblog_last_account = ""
# Static directories
self.base_files = {
'': {
'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
'children': {
'posts': {
'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
'children': {
'new': {
'file': dict(st_mode=(0o644 | stat.S_IFREG), st_nlink=1, st_size=0),
'children': None,
},
'reblogged': {
'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
'children': {}
},
},
},
'accounts': {
'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
'children': {
'me': {
'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
'children': {}
},
},
},
'timelines': {
'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
'children': {
'home': {
'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
'children': {}
},
'local': {
'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
'children': {}
},
'federated': {
'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
'children': {}
},
'public': {
'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
'children': {}
},
},
},
},
},
}
@cachedmethod(operator.attrgetter('mastodon_object_cache'))
def _get_post_cached(self, post_id):
post = self.api.status(post_id)
if post is not None:
self.long_term_posts[post_id] = True
return post
@cachedmethod(operator.attrgetter('mastodon_object_cache'))
def _get_account_cached(self, account_id):
acct = self.api.account(account_id)
if acct is not None:
self.long_term_accounts[account_id] = True
return acct
@cachedmethod(operator.attrgetter('mastodon_object_cache'))
def _get_timeline_cached(self, timeline_type):
if timeline_type == 'home':
return self.api.timeline_home()
elif timeline_type == 'local':
return self.api.timeline_local()
elif timeline_type == 'federated':
return self.api.timeline_public()
elif timeline_type == 'public':
return self.api.timeline_public(local=False)
return []
def _get_base_file(self, path):
if path == '/':
return self.base_files['']
curr = self.base_files['']
parts = path.strip('/').split('/')
for p in parts:
kids = curr.get('children', {})
if p in kids:
curr = kids[p]
else:
return None
return curr
def _extract_time(self, obj):
t = None
if isinstance(obj, dict) and "created_at" in obj:
t = obj["created_at"]
elif hasattr(obj, "created_at"):
t = getattr(obj, "created_at")
if not t:
return time.time()
if isinstance(t, str):
try:
dt = datetime.datetime.fromisoformat(t)
return dt.timestamp()
except Exception:
return time.time()
if isinstance(t, datetime.datetime):
return t.timestamp()
return time.time()
def _fetch_media(self, url):
try:
r = requests.get(url)
r.raise_for_status()
return r.content
except Exception:
return b""
def _list_keys(self, obj):
if isinstance(obj, list):
return [str(i) for i in range(len(obj))]
if isinstance(obj, dict):
base_keys = list(obj.keys())
if isinstance(obj, MediaAttachment):
if "url" in obj and "file" not in base_keys:
base_keys.append("file")
if "preview_url" in obj and "preview_file" not in base_keys:
base_keys.append("preview_file")
return base_keys
return []
def _get_child(self, obj, key):
if isinstance(obj, list):
i = int(key)
return obj[i]
if isinstance(obj, dict):
if isinstance(obj, MediaAttachment):
if key == "file" and "url" in obj:
return self._fetch_media(obj["url"])
if key == "preview_file" and "preview_url" in obj:
return self._fetch_media(obj["preview_url"])
return obj[key]
raise KeyError("No children")
def _traverse(self, obj, parts):
if not parts:
return obj
return self._traverse(self._get_child(obj, parts[0]), parts[1:])
def _resolve_path(self, path):
# static base tree
base_item = self._get_base_file(path)
base_children = None
if base_item is not None:
if base_item["children"] is None:
return PathItem("file", time.time(), 0, read_fn=lambda: b"")
else:
base_children = list(base_item["children"].keys())
# Split to parts
parts = path.strip('/').split('/')
# dynamic/posts tree
if parts[0] == 'posts':
return self._resolve_posts(path, parts)
# /accounts tree
if parts[0] == 'accounts':
return self._resolve_accounts(path, parts)
# /timelines tree
if parts[0] == 'timelines':
return self._resolve_timelines(path, parts)
# / specifically
if base_children is not None:
return PathItem("dir", time.time(), listdir_fn=lambda: base_children)
# If we get here, it's not a valid path
raise FuseOSError(errno.ENOENT)
def _resolve_posts(self, path, parts):
if len(parts) == 1:
# /posts
node = self._get_base_file(path)
# Get the list of posts from the long term cache
def list_posts():
base = list(node['children'].keys()) if node and 'children' in node else []
for pid in self.long_term_posts:
if pid not in base:
base.append(pid)
return base
return PathItem("dir", time.time(), listdir_fn=list_posts)
if parts[1] == 'reblogged':
# /posts/reblogged
if len(parts) == 2:
node = self._get_base_file(path)
if node:
return PathItem("dir", time.time(), listdir_fn=lambda: list(node.get('children', {}).keys()))
return PathItem("dir", time.time(), listdir_fn=lambda: [])
raise FuseOSError(errno.ENOENT)
# /posts/<id>/...
post_id = parts[1]
post_obj = self._get_post_cached(post_id)
if not post_obj:
raise FuseOSError(errno.ENOENT)
if len(parts) == 2:
# Post directory
t = self._extract_time(post_obj)
def listdir_post():
return self._list_keys(post_obj)
return PathItem("dir", t, listdir_fn=listdir_post)
# Traverse deeper into the object
try:
final_obj = self._traverse(post_obj, parts[2:])
except (KeyError, IndexError):
raise FuseOSError(errno.ENOENT)
return self._make_item_for_obj(final_obj)
def _resolve_accounts(self, path, parts):
# /accounts
if len(parts) == 1:
# top-level directory => static + known account IDs
node = self._get_base_file(path)
def list_accounts():
base = list(node['children'].keys()) if node and 'children' in node else []
# add known from long_term
for aid in self.long_term_accounts:
if aid not in base:
base.append(aid)
return base
return PathItem("dir", time.time(), listdir_fn=list_accounts)
# /accounts/me
if parts[1] == 'me':
acct = self.api.account_verify_credentials()
if acct:
self.long_term_accounts[str(acct.id)] = True
if len(parts) == 2:
t = self._extract_time(acct)
def list_me():
return self._list_keys(acct)
return PathItem("dir", t, listdir_fn=list_me)
try:
sub = self._traverse(acct, parts[2:])
except (KeyError, IndexError):
raise FuseOSError(errno.ENOENT)
return self._make_item_for_obj(sub)
# /accounts/<id>/...
account_id = parts[1]
acct_obj = self._get_account_cached(account_id)
if not acct_obj:
raise FuseOSError(errno.ENOENT)
if len(parts) == 2:
# Account directory
t = self._extract_time(acct_obj)
def list_acct():
return self._list_keys(acct_obj)
return PathItem("dir", t, listdir_fn=list_acct)
# Traverse deeper into the object
try:
sub = self._traverse(acct_obj, parts[2:])
except (KeyError, IndexError):
raise FuseOSError(errno.ENOENT)
return self._make_item_for_obj(sub)
def _resolve_timelines(self, path, parts):
# /timelines
if len(parts) == 1:
node = self._get_base_file(path)
def list_top():
return list(node['children'].keys()) if node and 'children' in node else []
return PathItem("dir", time.time(), listdir_fn=list_top)
if len(parts) == 2:
node = self._get_base_file(path)
def list_tl():
base = list(node['children'].keys()) if node and 'children' in node else []
posts = self._get_timeline_cached(parts[1])
base.extend(str(i) for i in range(len(posts)))
return base
return PathItem("dir", time.time(), listdir_fn=list_tl)
if len(parts) == 3:
timeline_type = parts[1]
idx_s = parts[2]
try:
idx = int(idx_s)
except ValueError:
raise FuseOSError(errno.ENOENT)
posts = self._get_timeline_cached(timeline_type)
if idx < 0 or idx >= len(posts):
raise FuseOSError(errno.ENOENT)
post = posts[idx]
def symlink_target():
return f"/posts/{post.id}"
t = self._extract_time(post)
return PathItem("symlink", t, symlink_target=symlink_target)
raise FuseOSError(errno.ENOENT)
def _make_item_for_obj(self, obj):
# Turn the object into a PathItem
if isinstance(obj, Account):
def symlink_target():
return f"/accounts/{obj.id}"
return PathItem("symlink", self._extract_time(obj), symlink_target=symlink_target)
if isinstance(obj, Status):
def symlink_target():
return f"/posts/{obj.id}"
return PathItem("symlink", self._extract_time(obj), symlink_target=symlink_target)
if isinstance(obj, (dict, list)):
t = self._extract_time(obj)
def listdir_fn():
return self._list_keys(obj)
return PathItem("dir", t, listdir_fn=listdir_fn)
if isinstance(obj, bytes):
data_bytes = obj
size = len(data_bytes)
t = time.time()
return PathItem("file", t, size=size, read_fn=lambda: data_bytes)
# If nothing else: treat as text
data_str = str(obj).encode('utf-8')
size = len(data_str)
t = time.time()
return PathItem("file", t, size=size, read_fn=lambda: data_str)
def getattr(self, path, fh=None):
# We special case the /posts/reblogged directory to allow basically everything without error messages
# We drop pretty much all writes though
if path.startswith('/posts/reblogged'):
parts = path.split('/')
if len(parts) <= 4 or parts[-1] in ["mentions", "media_attachments", "emojis", "tags", "filtered", "application"]:
st = {
'st_mode': (0o755 | stat.S_IFDIR),
'st_nlink': 2,
'st_size': 0,
'st_mtime': time.time(),
'st_atime': time.time(),
'st_ctime': time.time()
}
else:
if parts[-1] == "account":
if path != self.reblog_last_account:
raise FuseOSError(errno.ENOENT)
else:
st = {
'st_mode': (0o755 | stat.S_IFLNK),
'st_nlink': 1,
'st_size': 0,
'st_mtime': time.time(),
'st_atime': time.time(),
'st_ctime': time.time()
}
return st
st = {
'st_mode': (0o644 | stat.S_IFREG),
'st_nlink': 1,
'st_size': 0,
'st_mtime': time.time(),
'st_atime': time.time(),
'st_ctime': time.time()
}
return st
# Otherwise, just resolve and return correct attrs
item = self._resolve_path(path)
mode = 0
if item.path_type == "dir":
mode = (0o755 | stat.S_IFDIR)
elif item.path_type == "file":
mode = (0o644 | stat.S_IFREG)
elif item.path_type == "symlink":
mode = (0o755 | stat.S_IFLNK)
st = {
'st_mode': mode,
'st_nlink': 1,
'st_size': item.size,
'st_mtime': item.mtime,
'st_atime': item.mtime,
'st_ctime': item.mtime
}
return st
def readdir(self, path, fh):
# Return the list of files in a directory
item = self._resolve_path(path)
if item.path_type != "dir":
raise FuseOSError(errno.ENOTDIR)
entries = ['.', '..']
entries.extend(item.listdir())
return entries
def readlink(self, path):
# Return the target of the symlink
item = self._resolve_path(path)
if item.path_type != "symlink":
raise FuseOSError(errno.EINVAL)
target = item.symlink_target
if callable(target):
target = target()
return os.path.relpath(target, os.path.dirname(path))
def read(self, path, size, offset, fh):
# Return contents of the attribute (or potentially downloaded media file)
item = self._resolve_path(path)
if item.path_type != "file":
raise FuseOSError(errno.EISDIR)
return item.read(offset, size)
def write(self, path, data, offset, fh):
# Write to /posts/new posts the thing
if path == '/posts/new':
if path not in self.write_buffers:
self.write_buffers[path] = b""
self.write_buffers[path] += data
return len(data)
# Write to /posts/reblogged/<anything>/id stores the id to reblog on close
if path.startswith('/posts/reblogged'):
if len(path.split('/')) == 5:
if path.split('/')[-1] == "id":
self.reblog_buffer = data.decode('utf-8')
return len(data)
raise FuseOSError(errno.EROFS)
def create(self, path, mode, fi=None):
# required for writing to work correctly in some cases
if path == '/posts/new':
self.write_buffers[path] = b""
return 0
# Better strategy: allow creation in general, then reblog when writing an id to the appropriate place
if path.startswith('/posts/reblogged'):
return 0
# no creation allowed generally
raise FuseOSError(errno.EROFS)
# Have to allow directory creation under /posts/reblogged
def mkdir(self, path, mode):
if path.startswith('/posts/reblogged'):
return 0
raise FuseOSError(errno.EROFS)
# Also have to allow file deletion under /posts/reblogged
# I'm not sure why but symlink copy breaks otherwise??
def unlink(self, path):
if path.startswith('/posts/reblogged'):
return 0
raise FuseOSError(errno.EROFS)
def release(self, path, fh):
# on close: post whatever is in the write buffer
if path == '/posts/new':
buf = self.write_buffers.get(path, b"")
if buf:
text = buf.decode('utf-8')
try:
newp = self.api.status_post(text)
if newp:
self.long_term_posts[str(newp.id)] = True
except Exception:
pass
del self.write_buffers[path]
# on closing a reblogged post, reblog it
if path.startswith('/posts/reblogged/'):
if len(path.split('/')) == 5:
if path.split('/')[-1] == "id":
print("CLOSING ID", self.reblog_buffer)
try:
self.api.status_reblog(self.reblog_buffer)
self.reblog_buffer = None
except Exception:
pass
return 0
def truncate(self, path, length, fh=None):
# required for writing to work correctly in many cases
if path == '/posts/new':
self.write_buffers[path] = b""
return 0
if path.startswith('/posts/reblogged'):
return 0
raise FuseOSError(errno.EROFS)
def symlink(self, target, source):
if target.startswith('/posts/reblogged'):
self.reblog_last_account = target
return 0
raise FuseOSError(errno.EROFS)
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: {} <mountpoint> <url> <token>".format(sys.argv[0]))
sys.exit(1)
mountpoint = sys.argv[1]
url = sys.argv[2]
token = sys.argv[3]
FUSE(MastoFS(url, token), mountpoint, nothreads=True, foreground=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment