Last active
April 3, 2018 05:39
-
-
Save juntalis/87c8cd049092dd5e169d134ebd8112d4 to your computer and use it in GitHub Desktop.
Python 3 only - cba to fuck with compatibility shims.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
import os | |
import io | |
import errno | |
import array | |
import weakref | |
import tempfile | |
import functools | |
# Simplified vs the stdlib code | |
TMP_MAX = getattr(os, 'TMP_MAX', 10000) | |
_text_openflags = os.O_RDWR | os.O_CREAT | os.O_EXCL \ | |
| getattr(os, 'O_NOFOLLOW', 0) | |
_bin_openflags = _text_openflags | getattr(os, 'O_BINARY', 0) | |
template = 'tmp' | |
_mkstemp_inner = tempfile._mkstemp_inner | |
_sanitize_params = tempfile._sanitize_params | |
_TemporaryFileWrapper = tempfile._TemporaryFileWrapper | |
############### | |
# Initial POC # | |
############### | |
# Typical usage would be: | |
# | |
# >>> with NamedTemporaryFile() as fp: | |
# ... | |
# ... # user code builds the contents of fp. | |
# ... ... | |
# ... # call is made to a reader function. reader function | |
# ... some_reader(fp) | |
# ... | |
# >>> def some_reader(fp): | |
# ... with reopen('rb') as fpin: | |
# ... # stuff | |
# | |
# EDIT: May have broken it when I started down the path of | |
# the second approach. Originally there was no tracking | |
# of duplicated file descriptors - it just expected that | |
# all calls to ``reopen``would close the resulting handles | |
# before the ``TemporaryFileWrapper`` instance was closed. | |
class TemporaryFileWrapper(_TemporaryFileWrapper): | |
def __init__(self, fd, name, mode='w+b', buffering=-1, encoding=None, | |
newline=None, delete=True): | |
# Store init_fd & initial ``io.open`` kwargs. | |
self.init_fd = fd | |
self.open_fds = set() | |
self.open_kwargs = dict(mode=mode, buffering=buffering, | |
newline=newline, encoding=encoding) | |
# Create a file object from the init_fd. | |
file = io.open(fd, **self.open_kwargs) | |
super(TemporaryFileWrapper, self).__init__(file, name, delete=delete) | |
def on_release(self, fd): | |
""" | |
Called when an associate file object is deleted/GCed. | |
:param fd: File descriptor of the file object that was gced | |
:type fd: int | |
""" | |
self.remove_fd(fd, False) | |
def add_fd(self, fd, file): | |
""" | |
Given a newely created file object and its corresponding | |
file descriptor, add a new reference to track. | |
:param fd: Filde descriptor | |
:type fd: int | |
:param file: file object | |
:type file: file | |
""" | |
if fd in self.open_fds: return | |
self.open_fds.add(fd) | |
weakref.finalize(file, self.on_release, fd) | |
def remove_fd(self, fd, close=True): | |
""" | |
Given a newely created file object and its corresponding | |
file descriptor, add a new reference to track. | |
:param fd: Filde descriptor | |
:type fd: int | |
:param file: file object | |
:type file: file | |
:param close: Whether or not to close the fd | |
:type close: bool | |
""" | |
if fd not in self.open_fds: return | |
self.open_fds.remove(fd) | |
if close: os.close(fd) | |
def reopen(self, **kwargs): | |
""" | |
Opens a new file object to the temporary file. Any unspecified parameter | |
reuses the original values from the constructor. See `io.open` for detailed | |
info on the parameters | |
:param mode: File mode to open in | |
:type mode: str | None | |
:param buffering: Buffering policy | |
:type buffering: int | None | |
:param encoding: Name of encoding to use | |
:type encoding: str | None | |
:param newline: Newline character | |
:type newline: str | None | |
:return: Newly created file object | |
:rtype: file | |
""" | |
dup_fd = os.dup(self.init_fd) | |
kwargs.update(closefd=True) | |
open_kwargs = self.open_kwargs.copy() | |
open_kwargs.update(kwargs) | |
# noinspection PyTypeChecker | |
file = io.open(dup_fd, **open_kwargs) | |
self.add_fd(dup_fd, file) | |
return file | |
@property | |
def ref_count(self): | |
return len(self.open_fds) | |
def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None, | |
newline=None, suffix=None, prefix=None, | |
dir=None, delete=True): | |
"""Create and return a temporary file. | |
Arguments: | |
'prefix', 'suffix', 'dir' -- as for mkstemp. | |
'mode' -- the mode argument to io.open (default "w+b"). | |
'buffering' -- the buffer size argument to io.open (default -1). | |
'encoding' -- the encoding argument to io.open (default None) | |
'newline' -- the newline argument to io.open (default None) | |
'delete' -- whether the file is deleted on close (default True). | |
The file is created as mkstemp() would do it. | |
Returns an object with a file-like interface; the name of the file | |
is accessible as its 'name' attribute. The file will be automatically | |
deleted when it is closed unless the 'delete' argument is set to False. | |
""" | |
flags =_bin_openflags | |
prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir) | |
# Setting O_TEMPORARY in the flags causes the OS to delete | |
# the file when it is closed. This is only supported by Windows. | |
if os.name == 'nt' and delete: | |
flags |= os.O_TEMPORARY | |
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type) | |
try: | |
return TemporaryFileWrapper(fd, name, mode=mode, buffering=buffering, encoding=encoding, | |
newline=newline, delete=delete) | |
except BaseException: | |
os.unlink(name) | |
os.close(fd) | |
raise | |
#################### | |
# Expanded Attempt # | |
#################### | |
# When the initial POC worked, I started playing around with the idea of | |
# expanding the attempt to track the the duplicated file descriptors and | |
# not allow the original fd to be closed until all dups were either closed | |
# or unused. Lost interest about halfway in when I realized how much wor | |
# would need to be done to handle some of the potential issues with this approach. | |
class HandleTracker: | |
""" | |
Tracks open file descriptors. Maintains a pool of unused file descriptors | |
from past calls to `open` because that's totally not a premature optimizaiton that | |
would never see a warranting use case. | |
:type unused_fds: array.array | |
:type used_fds: dict[int,weakref.ref] | |
""" | |
__slots__ = ('max_unused', 'used_fds', 'unused_fds',) | |
def __init__(self, max_unused=5): | |
""" | |
Constructor - initializes empty containers | |
:param max_unused: Configure rhe maximum number of unused fds we can pool | |
:type max_unused: int | |
""" | |
self.used_fds = dict() | |
self.max_unused = max_unused | |
self.unused_fds = array.array('i') | |
def _ensure(self, fd, file=None, ensure_used=False): | |
""" | |
Ensure that a fd is used/unused | |
:param fd: File descriptor | |
:type fd: int | |
:param file: File object. Required to ensure unused | |
:type file: file | None | |
:param ensure_used: ``True`` to ensure used. ``False`` otherwise. | |
:type ensure_used: bool | |
:return: Whether or not it meeds the ensured criteria | |
:rtype: bool | |
""" | |
if ensure_used: return fd in self.used_fds | |
# Remaining logic ensures unused | |
if fd in self.used_fds: | |
# fd exists in our used_fds dict. Check if it matches the | |
# specified file. If it does, we're still okay. | |
fileref = self.used_fds[fd] | |
return file is not None and fileref() is file | |
# Not in our used dict - we're good. | |
return True | |
def _remove_unused(self, fd, errors=False): | |
""" | |
Attempt to remove a file descriptor from the :attr:`unused_fds` pool. | |
:param fd: File descriptor | |
:type fd: int | |
:param errors: Whether or not to throw errors for unknown fds. | |
:type errors: bool | |
:return: | |
:rtype: | |
:raises KeyError: when :param:`errors` is set to ``True`` and ``fd`` is unknown. | |
""" | |
try: | |
self.unused_fds.remove(fd) | |
except KeyError: | |
if errors: raise | |
def _on_delete(self, fd): | |
""" | |
Called when an associate file object is deleted/GCed. | |
:param fd: File descriptor of the file object that was gced | |
:type fd: int | |
""" | |
self.release_fd(fd, errors=False) | |
def use_fd(self, fd, file): | |
""" | |
Given a newely created file object and its corresponding | |
file descriptor, add a new reference to track. | |
:param fd: Filde descriptor | |
:type fd: int | |
:param file: file object | |
:type file: file | |
:raises RuntimeError: when the specified :param:`fd` is already being used. | |
""" | |
# If this fd is already attached to another file object, we have a problem. | |
if not self._ensure(fd, file=file): | |
raise RuntimeError( | |
'fd attached to another file object without closing previous!' | |
) | |
# Add the pair to our used_fds dict. | |
self.used_fds[fd] = weakref.ref(file) | |
# Attach a finalizer callback to the file - just in case. | |
weakref.finalize(file, self._on_delete, fd) | |
# Attempt to remove the fd from our unused_fds pool. | |
self._remove_unused(fd) | |
def release_fd(self, fd, errors=True, close=False): | |
""" | |
Release a used fd. | |
:param fd: Filde descriptor | |
:type fd: int | |
:param errors: Should an error be thrown if fd is not known? | |
:type errors: bool | |
:param close: Close the fd? | |
:type close: bool | |
:raises KeyError: when :param:`errors` is set to ``True`` and ``fd`` is unknown. | |
""" | |
# Ensure that this is a tracked in-use fd | |
if not self._ensure(fd, ensure_used=True): | |
if not errors: return | |
raise KeyError('Unknown fd specified: {}'.format(fd)) | |
# Remove fd from our dict | |
self.used_fds.pop(fd) | |
# Branch logic depending on whether we need to close the fd. | |
if close: return os.close(fd) | |
self.unused_fds.add(fd) | |
def open(self, **kwargs): | |
""" | |
Opens a new file object to the temporary file. Any unspecified parameter | |
reuses the original values from the constructor. See `io.open` for detailed | |
info on the parameters | |
:param mode: File mode to open in | |
:type mode: str | None | |
:param buffering: Buffering policy | |
:type buffering: int | None | |
:param encoding: Name of encoding to use | |
:type encoding: str | None | |
:param newline: Newline character | |
:type newline: str | None | |
:return: Newly created file object | |
:rtype: file | |
""" | |
dup_fd = os.dup(self.init_fd) # init_fd not currently a thing - broken | |
kwargs.update(closefd=True) | |
open_kwargs = self.open_kwargs.copy() | |
open_kwargs.update(kwargs) | |
# noinspection PyTypeChecker | |
file = io.open(dup_fd, **open_kwargs) | |
self.use_fd(dup_fd, file) | |
return file | |
@property | |
def open_count(self): | |
return len(self.used_fds) | |
# TODO: Add a close that doesn't process until open_count == 0, at which | |
# point is closes all unused fds, and init_fd. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment