Skip to content

Instantly share code, notes, and snippets.

@dhke
Last active November 30, 2016 21:36
Show Gist options
  • Save dhke/18d25c7263f690d8bb4973c8c6531fd1 to your computer and use it in GitHub Desktop.
Save dhke/18d25c7263f690d8bb4973c8c6531fd1 to your computer and use it in GitHub Desktop.
Functions to parse imaplib2 responses into more pythonesque data
# -*- encoding=utf-8 -*-
from __future__ import print_function
from peekable import peekable
SP = b' '
CTL = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0f' \
b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1f'
LIST_WILDCARDS = b'%*'
QUOTED_SPECIALS = b'\\"'
RESP_SPECIALS = b']'
# remember me to complain about the fact that the IMAP ABNF is backwards
# and only gets properly compositional once you deconstruct it.
LIST_SPECIALS = r'(){' + SP + CTL + QUOTED_SPECIALS
ASTRING_SPECIALS = LIST_SPECIALS + LIST_WILDCARDS + QUOTED_SPECIALS
ATOM_SPECIALS = ASTRING_SPECIALS + RESP_SPECIALS
ATOM_SLASH_SPECIALS = ATOM_SPECIALS + '/'
ATOM_DOT_SPECIALS = ATOM_SPECIALS + '/'
_sentinel = object()
class ParseError(ValueError):
pass
def _wrap_iter(src):
if not isinstance(src, peekable):
return peekable(iter(src))
else:
return src
def _next_or_parse_error(src, message):
try:
return next(src)
except StopIteration:
raise ParseError(message)
def _expect(src, expected, message=None):
message = message or 'Expected one of "{expected}", got "{token}"'
c = _next_or_parse_error(src, 'Expected "{}", got EOF'.format(expected))
if c not in expected:
raise ParseError(message.format(expected=expected, token=c))
return c
def parse_atom(src, atom_specials=None):
src = _wrap_iter(src)
atom_specials = atom_specials or ATOM_SPECIALS
c = src.peek(_sentinel)
if c in ATOM_SPECIALS:
raise ParseError('Invalid character in atom: "{0}"'.format(c))
if c is _sentinel:
raise ParseError('Expected atom, got EOF')
atom = ''
while c is not _sentinel and c not in atom_specials:
next(src)
atom += c
c = src.peek(_sentinel)
return atom
def parse_string(src):
src = _wrap_iter(src)
c = src.peek(_sentinel)
if c is _sentinel:
raise ParseError('Expected atom, got EOF')
if c == '{':
return parse_literal(src)
else:
return parse_quoted_string(src)
def parse_literal(src):
src = _wrap_iter(src)
c = _expect(src, '{')
c = _next_or_parse_error(src, 'Expected octet count, got EOF')
if not c.isdigit():
raise ParseError('Expected octet count, got "{}"'.format(c))
octet_count = ''
while c.isdigit():
octet_count += c
_expect(src, '}')
_expect(src, '\r')
_expect(src, '\n')
# this should always succeed
octet_count = int(octet_count)
literal = ''
for x in xrange(0, octet_count):
c = _next_or_parse_error(src, 'Not enough octets in literal')
literal += c
return literal
def parse_quoted_string(src):
src = _wrap_iter(src)
c = _expect(src, '"')
last_slash = False
c = next(src, _sentinel)
s = ''
while c is not _sentinel and (last_slash or c != '"'):
if not last_slash and c == '\\':
last_slash = True
else:
last_slash = False
s += c
c = next(src, _sentinel)
if c != '"':
raise ParseError('Unterminated quoted string')
return s
def parse_string_or_atom(src, atom_specials=None):
src = _wrap_iter(src)
atom_specials = atom_specials or ATOM_SPECIALS
c = src.peek(_sentinel)
if c in '"{':
return parse_string(src)
else:
return parse_atom(src, atom_specials=atom_specials)
def parse_astring(src):
return parse_string_or_atom(src, atom_specials=ASTRING_SPECIALS)
def parse_list(src, nested_parser=None):
src = _wrap_iter(src)
nested_parser = nested_parser or parse_string_or_atom
l = []
first = True
c = _expect(src, '(')
c = src.peek(_sentinel)
while c is not _sentinel and c != ')':
if not first:
_expect(src, SP)
first = False
l.append(nested_parser(src))
c = next(src, _sentinel)
if c != ')':
raise ParseError('Unterminated list')
return l
def parse_flag(src):
src = _wrap_iter(src)
_expect(src, '\\')
return parse_atom(src)
def parse_list_response(src):
src = _wrap_iter(src)
flags = parse_list(src, nested_parser=parse_flag)
_expect(src, SP)
delimiter = parse_astring(src)
_expect(src, SP)
mailbox = parse_string_or_atom(src, atom_specials=LIST_SPECIALS)
if mailbox.upper() == 'INBOX':
mailbox = 'INBOX'
return [flags, delimiter, mailbox]
def parse_annotation_entry(src):
src = _wrap_iter(src)
# XXX -
# attributes are returned as a combination of atom-slash and (mostly) atom-dot
# while this code uses quoted-string which is more lenient.
# be sure to validate the resulting names and values in your app.
mbox = parse_quoted_string(src)
_expect(src, SP)
attr = parse_quoted_string(src)
_expect(src, SP)
attr_values = parse_list(src, nested_parser=parse_quoted_string)
if len(attr_values) % 2 != 0:
raise ParseError('Attribute value-list contains odd number of entries')
attr_values = iter(attr_values)
attr_values = zip(attr_values, attr_values)
return [mbox, attr, attr_values]
def split_mbox_path(mbox_path, sep):
return mbox_path.split(sep)
# -*- encoding=utf-8 -*-
from __future__ import print_function
from itertools import islice
from collections import deque
__all__ = [
'peekable',
]
# Originally from Eric Rose's more-itertools
#
# Copyright (c) 2012 Erik Rose
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is furnished to do
# so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
_marker = object()
class peekable(object):
"""Wrap an iterator to allow lookahead.
Call ``peek()`` on the result to get the value that will next pop out of
``next()``, without advancing the iterator:
>>> p = peekable(['a', 'b'])
>>> p.peek()
'a'
>>> next(p)
'a'
Pass ``peek()`` a default value to return that instead of raising
``StopIteration`` when the iterator is exhausted.
>>> p = peekable([])
>>> p.peek('hi')
'hi'
You may index the peekable to look ahead by more than one item.
The values up to the index you specified will be cached.
Index 0 is the item that will be returned by ``next()``, index 1 is the
item after that, and so on:
>>> p = peekable(['a', 'b', 'c', 'd'])
>>> p[0]
'a'
>>> p[1]
'b'
>>> next(p)
'a'
>>> p[1]
'c'
>>> next(p)
'b'
To test whether there are more items in the iterator, examine the
peekable's truth value. If it is truthy, there are more items.
>>> assert peekable([1])
>>> assert not peekable([])
"""
def __init__(self, iterable):
self._it = iter(iterable)
self._cache = deque()
def __iter__(self):
return self
def __bool__(self):
try:
self.peek()
except StopIteration:
return False
return True
def __nonzero__(self):
# For Python 2 compatibility
return self.__bool__()
def peek(self, default=_marker):
"""Return the item that will be next returned from ``next()``.
Return ``default`` if there are no items left. If ``default`` is not
provided, raise ``StopIteration``.
"""
if not self._cache:
try:
self._cache.append(next(self._it))
except StopIteration:
if default is _marker:
raise
return default
return self._cache[0]
def __next__(self):
if self._cache:
return self._cache.popleft()
return next(self._it)
def next(self):
# For Python 2 compatibility
return self.__next__()
def _get_slice(self, index):
start = index.start
stop = index.stop
if (
((start is not None) and (start < 0)) or
((stop is not None) and (stop < 0))
):
raise ValueError('Negative indexing not supported')
cache_len = len(self._cache)
if stop is None:
self._cache.extend(self._it)
elif stop >= cache_len:
self._cache.extend(islice(self._it, stop - cache_len))
return list(self._cache)[index]
def __getitem__(self, index):
if isinstance(index, slice):
return self._get_slice(index)
return self._get_slice(slice(index, index + 1, None))[0]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment