Last active
January 25, 2018 18:47
-
-
Save baileyparker/32c8c9d827f30f0c2bacc5b3db1990f5 to your computer and use it in GitHub Desktop.
MetaBrainz chatbox voting plugin cleanup for codereview.stackexchange.com
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import wraps | |
from inspect import ismethod | |
class PersistentDecorator(object): | |
def __init__(self, decorated_object, save): | |
self._decorated_object = decorated_object | |
self._save = save | |
def __getattribute__(self, attr): | |
if attr.startswith('_'): | |
return super(PersistentDecorator, self).__getattribute__(attr) | |
value = getattr(self._decorated_object, attr) | |
if not ismethod(value): | |
return value | |
@wraps(value) | |
def wrap(*args, **kwargs): | |
return_value = value(*args, **kwargs) | |
self._save(self._decorated_object.to_json()) | |
return return_value | |
return wrap |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from collections import namedtuple | |
from functools import wraps | |
import re | |
from poll import Poll, OptionError | |
from persistent_decorator import PersistentDecorator | |
from ..base import BasePlugin | |
from ..decorators import (listens_to_all, listens_to_command, | |
listens_to_regex_command) | |
STORAGE_KEY = 'data' | |
SUPPORT_INDICATORS = ('+', '👍', '👍🏻', '👍🏼', '👍🏽', '👍🏾', '👍🏿') | |
OPPOSITION_INDICATORS = ('-', '👎', '👎🏻', '👎🏼', '👎🏽', '👎🏾', '👎🏿') | |
class Plugin(BasePlugin): | |
@listens_to_command('startpoll') # (1) | |
def handle_start_command(self, line, option_names): # (2) | |
"""Handles the !startpoll command. | |
When issued with no arguments, starts a binary poll. Optional | |
arguments are the option names in the poll. For example: | |
<user1> !startpoll | |
<BrainzBot> A new poll has been started. You can vote with !support, | |
!oppose, !abstain, or by starting your message with a +1 | |
or -1. See a tally of votes with !tallypoll and end the | |
poll with !endpoll. | |
<user1> !startpoll delicious_fruits vegetables | |
<BrainzBot> A new poll has been started between delicious_fruits and | |
vegetables. You can vote with !support [option], | |
!oppose [option], !abstain, or by starting your message | |
with a +[option] or -[option]. See a tally of votes with | |
!tallypoll and end the poll with !endpoll. | |
The command will report an error if a vote is ongoing. | |
""" | |
if self.current_poll: | |
return u'A poll is already ongoing. Issue !endpoll to end it.' | |
if len(option_names) == 1: | |
return u'Add 2 or more options to a poll or issue !startpoll ' \ | |
'for a binary poll' | |
self.current_poll = Poll.create(option_names) | |
if len(option_names) >= 2: | |
return u'A new poll has started. You can vote with !support, ' \ | |
u'!oppose, !abstain, or by starting your message with a ' \ | |
u'+1 or -1. See a tally of votes with !tallypoll and ' \ | |
u'end the poll with !endpoll' | |
else: | |
response = u"A new poll has started between {}. You can vote " \ | |
u"with !support [option], !oppose [option], " \ | |
u"!abstain, or by starting your message with a " \ | |
u"+[option] or -[option]. See a tally of votes " \ | |
u"with !tallypoll and end the poll with !endpoll" | |
return response.format(english_list(option_names)) # (3) | |
def requires_ongoing_poll(func): # (4) | |
@wraps(func) | |
def wrapped(self, *args, **kwargs): | |
if self.current_poll is None: | |
return u'No active poll. Start one with !startpoll' | |
return func(self, *args, **kwargs) | |
return wrapped | |
@listens_to_command('support') | |
@requires_ongoing_poll | |
def handle_support_command(self, line, args): | |
if len(args) > 1: | |
return u'Support expects either no arguments or one argument.' | |
try: | |
partial_option_name = next(args, None) | |
self.current_poll.support(line.user, partial_option_name) | |
except OptionError as e: | |
return e.message | |
@listens_to_command('oppose') | |
@requires_ongoing_poll | |
def handle_oppose_command(self, line, args): | |
if len(args) > 1: | |
return u'Oppose expects either no arguments or one argument.' | |
try: | |
partial_option_name = next(args, None) | |
self.current_poll.oppose(line.user, partial_option_name) | |
except OptionError as e: | |
return e.message | |
INDICATORS = r'|'.join(map(re.escape, | |
SUPPORT_INDICATORS + OPPOSITION_INDICATORS)) | |
@listens_to_all(r'^(?<indicator>(' + INDICATORS + r'))(?<option>\W+)') | |
def handle_message_with_possible_vote(self, line, indicator, option): | |
if self.current_poll is None: | |
# Must not have been a vote | |
return | |
try: | |
if indicator in SUPPORT_INDICATORS: | |
self.current_poll.support(line.user, option) | |
else: | |
self.current_poll.oppose(line.user, option) | |
except OptionError as e: | |
return e.message | |
@listens_to_command('abstain') | |
@requires_ongoing_poll | |
def handle_abstain_command(self, line, args): | |
if len(args) > 0: | |
return u'Abstain expects no arguments. Issue just !abstain' | |
self.current_poll.oppose(line.user) | |
@listens_to_command('tallypoll') | |
@requires_ongoing_poll | |
def handle_tally_command(self, line, args): | |
if len(args) > 0: | |
return u'Tally expects no arguments. Issue just !tallypoll' | |
# NOTE: tally() returns a Tally, see below | |
return str(self.current_poll.tally()) | |
@listens_to_command('endpoll') | |
@requires_ongoing_poll | |
def handle_endvote(self, line, args): | |
if len(args) > 0: | |
return u'Tally expects no arguments. Issue just !endpoll' | |
tally = '' | |
if self.current_poll.changed_since_last_tally: | |
# NOTE: tally() returns a Tally, see below | |
tally = u"{} ".format(self.current_poll.tally()) | |
self.current_poll = None | |
return u"{}Poll ended.".format(tally) | |
@property | |
def current_poll(self): | |
# If _current_poll doesn't exist, a poll may be in storage | |
if not hasattr(self, '_current_poll'): | |
self._current_poll = self.retrieve(STORAGE_KEY) | |
# If there was a poll in storage, deserialize it | |
if self._current_poll is not None: | |
self._current_poll = \ | |
self._wrap_poll(Poll.from_json(self._current_poll)) | |
return self._current_poll | |
@current_poll.setter | |
def current_poll(self, poll): | |
self._current_poll = self._wrap_poll(poll) | |
self.store(STORAGE_KEY, poll.to_json()) | |
def _wrap_poll(self, poll): | |
return PersistentDecorator(poll, partial(self.store, STORAGE_KEY)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import namedtuple | |
import json | |
import operator | |
BINARY_OPTION_NAME = '1' | |
class Poll(object): | |
@classmethod | |
def create(cls, option_names): | |
# Support a binary poll with no named options | |
if len(option_names) == 0: | |
option_names = [BINARY_OPTION_NAME] | |
options = {option: (set(), set()) for option in option_names} | |
return Poll(options, set(), changed_since_last_tally=True) | |
@classmethod | |
def from_json(cls, serialized): | |
data = json.loads(serialized) | |
options = \ | |
{name: (set(supporters), set(opposers)) | |
for name, (supporters, opposers) in data['options'].iteritems()} | |
abstains = set(data['abstains']) | |
changed_since_last_tally = data['changed_since_last_tally'] | |
return Poll(options, abstains, changed_since_last_tally) | |
def __init__(self, options, abstains, changed_since_last_tally): | |
self._options = options | |
self._abstains = abstains | |
self._changed_since_last_tally = changed_since_last_tally | |
def support(self, user, partial_option_name=None): | |
supporters, opposers = self._get_option(partial_option_name) | |
if user in supporters: | |
# No changes necessary | |
return | |
self._clear_vote(user) | |
supporters.add(user) | |
opposers.remove(user) | |
self._changed_since_last_tally = True | |
def oppose(self, user, partial_option_name=None): | |
supporters, opposers = self._get_option(partial_option_name) | |
if user in opposers: | |
# No changes necessary | |
return | |
self._clear_vote(user) | |
supporters.remove(user) | |
opposers.add(user) | |
self._changed_since_last_tally = True | |
def abstain(self, user): | |
if user in self._abstains: | |
# No changes necessary | |
return | |
# Clear any other votes the user has cast | |
for supporters, opposers in self._options.itervalues(): | |
supporters.remove(user) | |
opposers.remove(user) | |
self._abstains.add(user) | |
self._changed_since_last_tally = True | |
def _get_option(self, partial_option_name): | |
if partial_option_name is None: | |
if len(self._options) != 1: | |
raise OptionError(partial_option_name, self._options.keys()) | |
return self._options[BINARY_OPTION_NAME] | |
# Consider options in sorted order, finding the first that has | |
# partial_option_name as a prefix to support partially typed option | |
# names | |
options = sorted(self._options.iteritems(), | |
key=operator.itemgetter(0)) | |
for option_name, (supporters, opposers) in options: | |
if option_name.startswith(partial_option_name): | |
return supporters, opposers | |
raise OptionError(partial_option_name, self._options.keys()) | |
def tally(self): | |
self._changed_since_last_tally = False | |
options = \ | |
[Option(name, list(supporters), list(opposers)) | |
for name, (supporters, opposers) in self._options.iteritems()] | |
return Tally(options, list(self.abstains)) | |
@property | |
def changed_since_last_tally(self): | |
return self._changed_since_last_tally | |
def to_json(self): | |
data = {'options': self._options, | |
'abstains': self._abstains, | |
'changed_since_last_tally': self._changed_since_last_tally} | |
return json.dumps(data) | |
class Tally(namedtuple('Tally', ('options', 'abstains'))): | |
__slots__ = () | |
def __str__(self): | |
# NOTE: Python 3.6 f-strings would make this much cleaner | |
options = u' '.join(map(str, self.options)) | |
abstaining = u"[abstain ({num}): {names}]" \ | |
.format(num=len(self.abstains), names=', '.join(self.abstains)) | |
return u'{} {}'.format(options, abstaining) | |
class Option(namedtuple('Option', ('name', 'supporters', 'opposers'))): | |
__slots__ = () | |
def __str__(self): | |
# NOTE: Python 3.6 f-strings would make this much cleaner | |
names = u'; '.join(u', '.join(name_list) | |
for name_list in (self.supporters, self.opposers)) | |
return u"[{name}(+{num_supporters}, -{num_opposers}): {names}]" \ | |
.format(self.name, num_supporters=len(self.supporters), | |
num_opposers=len(self.opposers), names=names) | |
class OptionError(Exception): | |
def __init__(self, bad_name, options): | |
if len(options) == 1: | |
msg = u'poll is binary (try +1 or -1, instead of a named option)' | |
else: | |
msg = u'expected one of the following options: {}' \ | |
.format(u', '.join(options)) | |
super(OptionError, self).__init__(msg) | |
self.bad_name = bad_name | |
self.options = options |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment