Last active
December 28, 2018 20:39
-
-
Save etienned/34d8247756a1a49110c08385b85d634b to your computer and use it in GitHub Desktop.
Tests and code for CFPreferences class of Ansible osx_defaults module.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
from base64 import b64decode | |
import calendar | |
import contextlib | |
import datetime | |
import os | |
import re | |
import shutil | |
import string | |
import time | |
import CoreFoundation | |
import Foundation | |
from copy import deepcopy | |
from ansible.module_utils.six import string_types, binary_type, text_type | |
from ansible.module_utils.six import integer_types | |
import pytest | |
# Regular expression that match datetime formats. Should match mostly the | |
# same formats that YAML support. This regex is adapted from the one defined | |
# in the YAML specifications <http://yaml.org/type/timestamp.html>. | |
# It is also almost conforming to the ISO 8601 format | |
# <https://en.wikipedia.org/wiki/ISO_8601>. | |
RE_DATETIME = re.compile(r""" | |
# year-month-day | |
(?P<year>\d{4})-(?P<month>[0-1]?[0-9])-(?P<day>[0-3]?[0-9]) | |
# start of optional time section | |
(?: | |
# time separator | |
(?:\ +|[Tt]) | |
# hour-minute-second | |
(?P<hour>[0-2]?[0-9]):(?P<minute>[0-5][0-9]):(?P<second>[0-5][0-9]) | |
# optional microsecond | |
(?:\.(?P<microsecond>\d*))? | |
# optional timezone info | |
(?P<timezone>\ *Z|\ *[-+][0-2]?[0-9](?::?[0-5][0-9])?)? | |
)?$ | |
""", re.VERBOSE) | |
def string_to_datetime(value): | |
""" | |
Convert a date and time string to a datetime object. | |
String need to be similar to the Combined date and time ISO 8601 form | |
(YYYY-MM-DDThh:mm:ss+00:00). | |
Local timezone offset will be added to dates without timezone. | |
""" | |
if not isinstance(value, string_types): | |
raise ValueError('Value need to be a string.') | |
match = RE_DATETIME.match(value) | |
if match: | |
timezone = match.group('timezone') | |
offset = None | |
if timezone: | |
timezone = timezone.lstrip() | |
if timezone == 'Z': | |
offset = 0 | |
else: | |
if ':' in timezone: | |
hour, minute = timezone.split(':') | |
elif len(timezone) > 3: | |
hour, minute = timezone[0:3], timezone[3:5] | |
else: | |
hour, minute = timezone, 0 | |
sign, hour = hour[0], hour[1:] | |
offset = ((int(hour) * 60) + int(minute)) * 60 | |
if sign == '-': | |
offset = -offset | |
local_datetime = datetime.datetime( | |
int(match.group('year')), | |
int(match.group('month')), | |
int(match.group('day')), | |
int(match.group('hour')) if match.group('hour') else 0, | |
int(match.group('minute')) if match.group('minute') else 0, | |
int(match.group('second')) if match.group('second') else 0, | |
# Skip microsecond because they are unsupported in CF. | |
0, | |
) | |
class Offset(datetime.tzinfo): | |
def __init__(self, local_datetime, offset=None): | |
if offset is None: | |
# Get local timezone offset for the specified date. | |
timestamp = calendar.timegm(local_datetime.timetuple()) | |
local_datetime = datetime.datetime.fromtimestamp(timestamp) | |
utc_datetime = datetime.datetime.utcfromtimestamp(timestamp) | |
self.__offset = local_datetime - utc_datetime | |
else: | |
self.__offset = datetime.timedelta(0, offset) | |
def utcoffset(self, dt=None): | |
return self.__offset | |
timezone = Offset(local_datetime, offset) | |
return local_datetime.replace(tzinfo=timezone) | |
raise ValueError( | |
'Invalid string format for datetime: `{0}`'.format(value) | |
) | |
def equivalent_types(value1, value2): | |
""" | |
Compare type of two values and return if they are equivalent or not. | |
Objective-C types are considered equivalent to their corresponding | |
python types. | |
For example, `objc.pyobjc_unicode` is equivalent to `unicode`. | |
""" | |
supported_types = ( | |
bool, | |
integer_types, | |
float, | |
(text_type, binary_type), | |
datetime.datetime, | |
Data, | |
list, | |
dict, | |
) | |
for value_type in supported_types: | |
if isinstance(value1, value_type) and isinstance(value2, value_type): | |
return True | |
return False | |
class Data(binary_type): | |
""" | |
Object representing binary data. | |
Instance should be initialize with binary data encoded in base64 codec. | |
""" | |
# List of all Base64 accepted characters. | |
BASE64_CHARS = (string.ascii_letters + string.digits + '+/=').encode('ascii') | |
# List of all text characters. | |
TEXT_CHARS = bytearray(set([7, 8, 9, 10, 12, 13, 27]) | set(range(0x20, 0x100)) - set([0x7f])) | |
def __new__(cls, data): | |
if isinstance(data, cls): | |
return data | |
# Try to convert unicode to ascii string. | |
if isinstance(data, text_type): | |
try: | |
data = data.encode('ascii') | |
except UnicodeEncodeError: | |
pass | |
# Check if data is a valid base64 string. Short strings are not | |
# considered as binary. | |
if isinstance(data, binary_type) and len(data) > 51 and not data.translate(None, cls.BASE64_CHARS): | |
try: | |
binary_data = b64decode(data) | |
except TypeError: | |
pass | |
else: | |
if cls.is_binary(binary_data): | |
return super(Data, cls).__new__(cls, data) | |
raise ValueError('Unsupported data type.') | |
@classmethod | |
def is_binary(cls, data): | |
""" Check if data looks like binary data and not textual data. """ | |
if b'\x00' in data: | |
return True | |
# Check only first 512 characters. | |
data = data[:512] | |
# If more than 30% are non-text characters, then this is considered | |
# binary data. | |
return len(data.translate(None, cls.TEXT_CHARS)) / float(len(data)) > 0.3 | |
@property | |
def binary(self): | |
return b64decode(self) | |
class CFPreferences(object): | |
""" | |
Read, write and delete value for specified keys and indexes from macOS | |
Preferences files (.plist). It's possible to access nested values and to | |
write complex nested values. All types in written nested values should be | |
supported by the .plist format: bool, int, float, unicode, datetime, | |
binary data (as base64 string), list and dict. | |
This class uses CoreFoundation python binding to access .plist. | |
""" | |
def __init__(self, domain, any_user=False, host=None): | |
""" | |
Domain should be the identifier of the application whose preferences to | |
read or modify. Takes the form of a Java package name, com.foosoft.app | |
or `NSGlobalDomain` for global domain. `any_user` control if the | |
preference is for the current user only (default) or for any user. | |
`host` control if the preference is for the current host only or for | |
any host (default). | |
""" | |
if any_user: | |
self.user = Foundation.kCFPreferencesAnyUser | |
else: | |
self.user = Foundation.kCFPreferencesCurrentUser | |
if host == 'currentHost': | |
self.host = Foundation.kCFPreferencesCurrentHost | |
elif host is None: | |
self.host = Foundation.kCFPreferencesAnyHost | |
else: | |
# Keep it to be backward compatible, but that not look to be really | |
# supported by the API. Behavior of defaults with host given as a | |
# string look undefined anyway. Should probably be remove in the | |
# future if nobody prove it's actually functional. | |
self.host = host | |
self.domain = domain | |
@property | |
def domain(self): | |
return self._domain | |
@domain.setter | |
def domain(self, domain): | |
# Be sure domain is a string/unicode. If not, that will trigger | |
# a "Trace/BPT trap" crash. | |
if not isinstance(domain, string_types): | |
raise TypeError('Domain should be a string or unicode.') | |
if domain == 'NSGlobalDomain': | |
domain = Foundation.kCFPreferencesAnyApplication | |
self._domain = domain | |
def read(self, key): | |
""" | |
Read a preference value for the specified key. Nested values can be | |
access by giving all keys and indexes, separated by colons (:). | |
Indexes are zero-based. | |
Example: 'NSToolbar Configuration Browser:TB Item Identifiers:1' | |
Here we assume that the plist root is a dict, not an array. So first | |
key's level should always be a key (string) not an index (int). | |
""" | |
# Can't pass an array index to CFPreferencesCopyAppValue, | |
# we could probably read the entire plist in this case. | |
keys_n_idxs = self._split_keys_n_idxs(key) | |
# Get value/structure of the first level key. | |
if self._is_current_app_and_user(): | |
value = CoreFoundation.CFPreferencesCopyAppValue( | |
keys_n_idxs[0], self.domain | |
) | |
else: | |
value = CoreFoundation.CFPreferencesCopyValue( | |
keys_n_idxs[0], self.domain, self.user, self.host | |
) | |
# If there's more then one key level, follow the structure until the | |
# last level is reach or return None if some substructures are missing. | |
for key_or_idx in keys_n_idxs[1:]: | |
try: | |
value = value[key_or_idx] | |
except (KeyError, IndexError, TypeError, ValueError): | |
return None | |
value = self._normalize_to_python(value) | |
return value | |
def write(self, key, value, array_add=False): | |
""" | |
Write a preference value for the specified key. Nested values can be | |
written by giving all keys and indexes separated by colons (:). | |
Indexes are zero-based. | |
Example: 'NSToolbar Configuration Browser:TB Item Identifiers:1' | |
It's possible to write complex nested values. All types in written | |
nested values should be supported by the .plist format: bool, int, | |
float, unicode, datetime, binary data (as base64 string), list | |
and dict. | |
With array_add argument as True value can be an item or a list. | |
Item will be appended to the current array and list will extend | |
current array. | |
""" | |
keys_n_idxs = self._split_keys_n_idxs(key) | |
root = node = self._get_tree(keys_n_idxs) | |
# Add list and dict that are missing. | |
for key_or_idx, next_key_or_idx in zip(keys_n_idxs, keys_n_idxs[1:]): | |
self._validate_key_node(key_or_idx, node) | |
# Add missing list and dict. | |
if isinstance(node, list): | |
if key_or_idx > len(node): | |
raise IndexError( | |
'Index {0} in key `{1}` out of range.' | |
.format(key_or_idx, key) | |
) | |
if key_or_idx == len(node): | |
node.append([] if isinstance(next_key_or_idx, int) else {}) | |
elif key_or_idx not in node: | |
node[key_or_idx] = [] if isinstance(next_key_or_idx, int) else {} | |
node = node[key_or_idx] | |
# Set final value. | |
last_key_or_idx = keys_n_idxs[-1] | |
self._validate_key_node(last_key_or_idx, node) | |
if array_add: | |
if isinstance(node, list): | |
# If index doesn't exist, raise error except if it's the next one. | |
if last_key_or_idx > len(node): | |
raise IndexError( | |
'Index {0} in key `{1}` out of range.' | |
.format(last_key_or_idx, key) | |
) | |
if last_key_or_idx == len(node): | |
node.append([]) | |
else: # it's a dict. | |
if last_key_or_idx not in node: | |
node[last_key_or_idx] = [] | |
if not isinstance(node[last_key_or_idx], list): | |
raise TypeError( | |
"With array_add end node should be a list and it's not." | |
) | |
# Add only items that are not already present in the current list, | |
# and preserve order. | |
items_to_add = [ | |
item for item in value if item not in node[last_key_or_idx] | |
] | |
node[last_key_or_idx].extend(items_to_add) | |
elif isinstance(node, list): | |
# If index doesn't exist, raise error except if it's the next one. | |
if last_key_or_idx > len(node): | |
raise IndexError( | |
'Index {0} in key `{1}` out of range.' | |
.format(last_key_or_idx, key) | |
) | |
if last_key_or_idx == len(node): | |
node.append(value) | |
elif equivalent_types(node[last_key_or_idx], value): | |
node[last_key_or_idx] = value | |
else: | |
raise TypeError( | |
'New value type does not match current value type for key ' | |
'{0} ({1!r} {2} -> {3!r} {4}).' | |
.format( | |
last_key_or_idx, value, type(value), | |
node[last_key_or_idx], type(node[last_key_or_idx]) | |
) | |
) | |
else: # it's a dict. | |
if (last_key_or_idx in node and not | |
equivalent_types(node[last_key_or_idx], value)): | |
raise TypeError( | |
'New value type does not match current value type for key ' | |
'{0} ({1!r} {2} -> {3!r} {4}).' | |
.format( | |
last_key_or_idx, value, type(value), | |
node[last_key_or_idx], type(node[last_key_or_idx]) | |
) | |
) | |
node[last_key_or_idx] = value | |
# Update the plist. | |
value = root[keys_n_idxs[0]] | |
self._set_plist(keys_n_idxs[0], value) | |
def delete(self, key): | |
""" | |
Delete a preference value for the specified key. Nested values can be | |
access by giving all keys and indexes, separated by colons (:). | |
Indexes are zero-based. | |
Example: 'NSToolbar Configuration Browser:TB Item Identifiers:1' | |
Here we assume that the plist root is a dict, not an array. So first | |
key's level should always be a key (string) not an index (int). | |
If the key doesn't exist this function return None. | |
""" | |
keys_n_idxs = self._split_keys_n_idxs(key) | |
root = node = self._get_tree(keys_n_idxs) | |
for key_or_idx in keys_n_idxs[:-1]: | |
try: | |
node = node[key_or_idx] | |
except (IndexError, KeyError, TypeError, ValueError): | |
# That means there's nothing to delete. | |
return | |
last_key_or_idx = keys_n_idxs[-1] | |
key_or_idx_type = list if isinstance(last_key_or_idx, int) else dict | |
if not isinstance(node, key_or_idx_type): | |
# That means there's nothing to delete. | |
return | |
if isinstance(node, list): | |
if last_key_or_idx < len(node): | |
node.pop(last_key_or_idx) | |
elif last_key_or_idx in node: | |
del node[last_key_or_idx] | |
# Update the plist. | |
value = root.get(keys_n_idxs[0]) | |
self._set_plist(keys_n_idxs[0], value) | |
def _normalize_to_python(self, value): | |
""" | |
Return value with all Foundation types converted to their python | |
equivalent. | |
""" | |
if isinstance(value, (Foundation.NSMutableDictionary, dict)): | |
value = dict(value) | |
for key, item in value.items(): | |
value[key] = self._normalize_to_python(item) | |
elif isinstance(value, (Foundation.NSMutableArray, list, tuple)): | |
value = [self._normalize_to_python(item) for item in value] | |
elif isinstance(value, Foundation.NSDate): | |
value = string_to_datetime(text_type(value)) | |
elif isinstance(value, Foundation.NSMutableData): | |
value = Data(value.base64Encoding()) | |
return value | |
def _normalize_to_cf(self, value): | |
""" | |
Return value with all python datetime and Data objects converted | |
to their CoreFoundation equivalent. Python strings are converted | |
to unicode. | |
If value contains a type not supported by the .plist format, | |
a TypeError will be raise. | |
""" | |
if isinstance(value, dict): | |
for key, item in value.items(): | |
value[key] = self._normalize_to_cf(item) | |
elif isinstance(value, (list, tuple)): | |
value = [self._normalize_to_cf(item) for item in value] | |
elif isinstance(value, datetime.datetime): | |
value = self._datetime_to_cfdate(value) | |
elif isinstance(value, Data): | |
value = value.binary | |
value = CoreFoundation.CFDataCreate(None, value, len(value)) | |
elif isinstance(value, binary_type): | |
try: | |
value = text_type(value, 'utf-8') | |
except UnicodeDecodeError: | |
raise TypeError( | |
'Invalid string {0} of value `{1}` is unsupported.' | |
.format(type(value), repr(value)) | |
) | |
elif (value is not None and | |
not isinstance(value, integer_types) and | |
not isinstance(value, (bool, float, text_type))): | |
raise TypeError('{0} of value `{1}` is unsupported.'.format( | |
type(value), repr(value) | |
)) | |
return value | |
def _datetime_to_cfdate(self, date_time): | |
""" | |
Convert python datetime object to a Core Foundation CFDate object. | |
""" | |
offset = date_time.utcoffset() | |
if offset is None: | |
# Get local timezone offset when datetime have no timezone. | |
timestamp = calendar.timegm(date_time.timetuple()) | |
local_date_time = datetime.datetime.fromtimestamp(timestamp) | |
utc_date_time = datetime.datetime.utcfromtimestamp(timestamp) | |
offset = local_date_time - utc_date_time | |
# Get timezone offset from datetime object. | |
offset = (offset.days * 60 * 60 * 24) + offset.seconds | |
# Need to redirect PyObjC errors that are not errors. | |
with silence_stderr(): | |
gregorian_date = CoreFoundation.CFCalendarCreateWithIdentifier( | |
None, CoreFoundation.kCFGregorianCalendar | |
) | |
timezone = CoreFoundation.CFTimeZoneCreateWithTimeIntervalFromGMT( | |
None, offset | |
) | |
CoreFoundation.CFCalendarSetTimeZone(gregorian_date, timezone) | |
absolute_time = CoreFoundation.CFCalendarComposeAbsoluteTime( | |
gregorian_date, None, b"yMdHms", | |
date_time.year, date_time.month, date_time.day, | |
date_time.hour, date_time.minute, date_time.second | |
)[1] | |
cfdate = CoreFoundation.CFDateCreate(None, absolute_time) | |
return cfdate | |
def _split_keys_n_idxs(self, key_string): | |
""" Split key string in a list of keys and indexes (as int). """ | |
if not isinstance(key_string, string_types): | |
raise TypeError('Key should be a string. {0} {1}'.format(repr(key_string), type(key_string))) | |
keys_n_idxs = [ | |
int(key_or_idx) if key_or_idx.isdigit() else key_or_idx | |
for key_or_idx in key_string.strip(':').split(':') | |
] | |
# Be sure first key is a string. If not, that can trigger | |
# a "Trace/BPT trap" crash. | |
if not isinstance(keys_n_idxs[0], string_types): | |
raise TypeError('First key should be a string.') | |
return keys_n_idxs | |
def _is_current_app_and_user(self): | |
return (self.domain != Foundation.kCFPreferencesAnyApplication and | |
self.user != Foundation.kCFPreferencesAnyUser and | |
self.host == Foundation.kCFPreferencesAnyHost) | |
def _get_tree(self, keys_n_idxs): | |
""" | |
Return the tree that contains all the keys and indexes from the .plist. | |
""" | |
root = {} | |
tree = self.read(keys_n_idxs[0]) | |
if tree is not None: | |
root[keys_n_idxs[0]] = tree | |
return root | |
def _validate_key_node(self, key_or_idx, node): | |
key_or_idx_type = list if isinstance(key_or_idx, int) else dict | |
if not isinstance(node, key_or_idx_type): | |
raise TypeError( | |
'Type mismatch between the key `{0}` and the node `{1}` ' | |
'({2} -> {3}).' | |
.format(key_or_idx, repr(node), key_or_idx_type, type(node)) | |
) | |
def _set_plist(self, key, value): | |
""" Save the value for the key to the .plist and update the cache. """ | |
value = self._normalize_to_cf(value) | |
if self._is_current_app_and_user(): | |
CoreFoundation.CFPreferencesSetAppValue(key, value, self.domain) | |
CoreFoundation.CFPreferencesAppSynchronize(self.domain) | |
else: | |
CoreFoundation.CFPreferencesSetValue( | |
key, value, self.domain, self.user, self.host | |
) | |
CoreFoundation.CFPreferencesSynchronize( | |
self.domain, self.user, self.host | |
) | |
@contextlib.contextmanager | |
def silence_stderr(): | |
""" Prevent standard error from the PyObjC bridge to show up. """ | |
dev_null = os.open(os.devnull, os.O_RDWR) | |
save_stderr = os.dup(2) | |
os.dup2(dev_null, 2) | |
yield | |
os.dup2(save_stderr, 2) | |
os.close(dev_null) | |
def _auto_cast_type(value, first_level=True): | |
""" | |
Cast booleans, integers and floats given as string on first level (not | |
nested) to their proper type. It's currently useful to do this because | |
Ansible convert integers and floats found in first level variables to | |
string (but in nested structure, integers and floats keep their type). | |
Date strings are always cast to datetime objects because dates are | |
always given as string (Because JSON do not support datetime type). | |
Binary data encoded in base64 is always converted to Data object. | |
Strings are always converted to unicode objects. | |
It's possible to keep all those cases as string by specifying their | |
type: `type: string`. | |
""" | |
if isinstance(value, (text_type, binary_type)): | |
if first_level and isinstance(value, string_types): | |
if '.' in value: | |
try: | |
return float(value) | |
except ValueError: | |
pass | |
else: | |
try: | |
return int(value) | |
except ValueError: | |
pass | |
if value.lower() in ('on', 'true', 'yes'): | |
return True | |
if value.lower() in ('off', 'false', 'no'): | |
return False | |
try: | |
return string_to_datetime(value) | |
except ValueError: | |
pass | |
try: | |
return Data(value) | |
except ValueError: | |
pass | |
if isinstance(value, binary_type): | |
try: | |
return value.decode('utf-8') | |
except UnicodeDecodeError: | |
raise Exception('String is not valid UTF-8.') | |
elif isinstance(value, list): | |
return [_auto_cast_type(item, False) for item in value] | |
elif isinstance(value, dict): | |
return dict([(key, _auto_cast_type(item, False)) for key, item in value.items()]) | |
return value | |
BINARY_DATA = ( | |
b'AAAAAAF2AAIAAQNEaXgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADMtZFkSCsAAAAITsIQbWFjL' | |
b'WRldi1wbGF5Ym9vawAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' | |
b'AAAAAAAT75MNKbeWEAAAAAAAAAAP////8AAAkgAAAAAAAAAAAAAAAAAAAABWRldmVsAAAQAAg' | |
b'AAMy1yaQAAAARAAgAANKbv7EAAAABABAACE7CAAf6YgAH+joAAABiAAIAMkRpeDpVc2Vyczpl' | |
b'dGllbm5lOkRvY3VtZW50czpkZXZlbDptYWMtZGV2LXBsYXlib29rAA4AIgAQAG0AYQBjAC0AZ' | |
b'ABlAHYALQBwAGwAYQB5AGIAbwBvAGsADwAIAAMARABpAHgAEgAuVXNlcnMvZXRpZW5uZS9Eb2' | |
b'N1bWVudHMvZGV2ZWwvbWFjLWRldi1wbGF5Ym9vawATAAEvAAAVAAIADv//AAA=' | |
) | |
@pytest.fixture(scope='module') | |
def preferences(request): | |
domain = 'com.ansible.osx_defaults6' | |
name = domain + '.plist' | |
path = os.path.expanduser('~/Library/Preferences/') | |
shutil.copy(name, path) | |
preferences = CFPreferences(domain) | |
def delete(): | |
# This loop is needed to check when the temporary file with the new | |
# values created by the system will be replacing the real plist. | |
# Sometimes this can take many seconds. Without this sometimes the | |
# real plist is not deleted. | |
while len([n for n in os.listdir(path) if name in n]) > 1: | |
time.sleep(0.1) | |
try: | |
os.remove(os.path.join(path, name)) | |
except OSError: | |
pass | |
request.addfinalizer(delete) | |
return preferences | |
class Offset(datetime.tzinfo): | |
def __init__(self, local_datetime, offset=None): | |
if offset is None: | |
# Get local timezone offset for the specified date. | |
timestamp = calendar.timegm(local_datetime.timetuple()) | |
local_datetime = datetime.datetime.fromtimestamp(timestamp) | |
utc_datetime = datetime.datetime.utcfromtimestamp(timestamp) | |
self.__offset = local_datetime - utc_datetime | |
else: | |
self.__offset = datetime.timedelta(0, offset) | |
def utcoffset(self, dt=None): | |
return self.__offset | |
LOCAL_DATETIME = datetime.datetime(2016, 1, 10, 15, 2, 50, 0) | |
TZOFFSET = Offset(LOCAL_DATETIME) | |
LOCAL_DATETIME = LOCAL_DATETIME.replace(tzinfo=TZOFFSET) | |
class TestRead: | |
def test_bad_domain_type(self): | |
with pytest.raises(TypeError) as excinfo: | |
CFPreferences(True) | |
assert 'Domain should be a string' in str(excinfo.value) | |
def test_bad_key_type(self, preferences): | |
with pytest.raises(TypeError) as excinfo: | |
preferences.read(22) | |
assert 'Key should be a string' in str(excinfo.value) | |
@pytest.mark.parametrize('key,value', [ | |
('Bool1', True), | |
('Int1', 2015), | |
('Float1', 2.1), | |
('String1', u'True'), | |
('Datetime1', LOCAL_DATETIME), | |
('Data1', BINARY_DATA), | |
('Array1', [1, 2, 3]), | |
('Dict1', {'a': u'A', 'b': u'B', 'c': u'C'}), | |
]) | |
def test_basic_type(self, preferences, key, value): | |
out = preferences.read(key) | |
assert out == value | |
@pytest.mark.parametrize('key', [ | |
'NonexistentKey', 'Bool1:Int1', 'String1:3:Ouch' | |
]) | |
def test_nonexistant_key(self, preferences, key): | |
out = preferences.read(key) | |
assert out is None | |
@pytest.mark.parametrize('key,value', [ | |
('Nested1:Nested11', True), | |
(':Nested2:0:', True), | |
('Nested1:Nested12:2:Nested121', True), | |
]) | |
def test_nested_keys(self, preferences, key, value): | |
out = preferences.read(key) | |
assert out == value | |
def test_complex_value(self, preferences): | |
out = preferences.read('ComplexValue1') | |
value = [ | |
{ | |
'Bool2': False, | |
'Int2': 42, | |
'Float2': 2.7, | |
'String2': u'Yes', | |
'Datetime2': LOCAL_DATETIME, | |
'Data2': BINARY_DATA, | |
'Array2': [1, 2, 3], | |
'Dict2': {'Bool3': True}, | |
}, | |
u'A' | |
] | |
assert out == value | |
def test_nsglobaldomain(self): | |
preferences = CFPreferences('NSGlobalDomain') | |
out = preferences.read('Bool1') | |
assert out is None | |
def test_anyuser(self): | |
preferences = CFPreferences('com.ansible.osx_defaults', any_user=True) | |
out = preferences.read('SomeKey') | |
assert out is None | |
def test_nsglobaldomain_anyuser(self): | |
preferences = CFPreferences('NSGlobalDomain', any_user=True) | |
out = preferences.read('Bool1') | |
assert out is None | |
def test_nonexistant_domain(self): | |
preferences = CFPreferences('com.ansible.nonexistant_read') | |
out = preferences.read('Bool1') | |
assert out is None | |
class TestEquality: | |
@pytest.mark.parametrize('key,value', [ | |
('Bool1', True), | |
('Int1', 2015), | |
('Float1', 2.1), | |
('String1', u'True'), | |
('Datetime1', LOCAL_DATETIME), | |
('Data1', BINARY_DATA), | |
('Array1', [1, 2, 3]), | |
('Dict1', {'a': u'A', 'b': u'B', 'c': u'C'}), | |
]) | |
def test_basic_type(self, preferences, key, value): | |
out = preferences.read(key) | |
assert out == value | |
class TestWrite: | |
BASE_VALUES = [ | |
('Bool4', True), | |
('Int4', 2016), | |
('Float4', 23.4), | |
('Unicode4', u'Caleçon long\nGrrr'), | |
('String4', 'Caleçon long\nGrrr'), | |
('Datetime4', datetime.datetime.now()), | |
('Datetime5', '2002-02-21 14:12:45.3Z'), | |
('Datetime6', '2002-02-21 14:12:45.3-03:30'), | |
('Datetime7', '2002-02-21 14:12:45.3+03'), | |
('Datetime8', '2002-02-21 14:12:45'), | |
('Data4', BINARY_DATA), | |
('Array4', [1, 2, 3, 4, 5, u'665']), | |
('Dict4', {'a': u'A', 'b': True, 'c': 22}), | |
] | |
def write_read(self, preferences, key, value): | |
value = _auto_cast_type(value) | |
# Use a copy because preferences.write can modify mutable value. | |
if isinstance(value, (list, dict)): | |
copy = deepcopy(value) | |
else: | |
copy = value | |
preferences.write(key, copy) | |
out = preferences.read(key) | |
value = self.normalize_datetime(value) | |
assert value == out | |
def normalize_datetime(self, value): | |
if isinstance(value, datetime.datetime): | |
value = value.replace(microsecond=0) | |
if value.tzinfo is None or value.tzinfo.utcoffset(value) is None: | |
value = value.replace(tzinfo=Offset(value)) | |
elif isinstance(value, list): | |
return [self.normalize_datetime(item) for item in value] | |
elif isinstance(value, dict): | |
return dict([(key, self.normalize_datetime(item)) for key, item in value.items()]) | |
return value | |
def test_bad_key_type(self, preferences): | |
with pytest.raises(TypeError) as excinfo: | |
preferences.write(22, u'22') | |
assert 'Key should be a string' in str(excinfo.value) | |
def test_bad_first_key_type(self, preferences): | |
with pytest.raises(TypeError) as excinfo: | |
preferences.write('22', u'22') | |
assert 'First key should be a string' in str(excinfo.value) | |
def test_bad_value_type(self, preferences): | |
with pytest.raises(TypeError) as excinfo: | |
preferences.write('String1', None) | |
assert 'New value type does not match current value type for key' in str(excinfo.value) | |
def test_bad_string_value(self, preferences): | |
with pytest.raises(TypeError) as excinfo: | |
preferences.write('String1', b'\x99') | |
assert '` is unsupported.' in str(excinfo.value) | |
@pytest.mark.parametrize('key,value', BASE_VALUES) | |
def test_basic_type(self, preferences, key, value): | |
self.write_read(preferences, key, value) | |
@pytest.mark.parametrize('key,value', BASE_VALUES + [ | |
('Bool4', False), | |
('Int4', 1016), | |
('Float4', 1.777), | |
('String4', u'Zippp'), | |
('Datetime4', datetime.datetime.utcnow()), | |
('Data4', ( | |
'AAAFAAF2AAIAAQNEaXgAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAADMtZFkSCsAAAAIT' | |
'sIQbWFjLWRldi1wbGF5Ym9vawAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA' | |
'AAAAAAAAAAAAAAAAAAAAAAAT75MNKbeWEAAAAAAAAAAP////8AAAkgAAAAAAAAAAA' | |
'AAAAAAAAABWRldmVsAAAQAAgAAMy1yaQAAAARAAgAANKbv7EAAAABABAACE7CAAf6' | |
'YgAH+joAAABiAAIAMkRpeDpVc2VyczpldGllbm5lOkRvY3VtZW50czpkZXZlbDptY' | |
'WMtZGV2LXBsYXlib29rAA4AIgAQAG0AYQBjAC0AZABlAHYALQBwAGwAYQB5AGIAbw' | |
'BvAGsADwAIAAMARABpAHgAEgAuVXNlcnMvZXRpZW5uZS9Eb2N1bWVudHMvZGV2ZWw' | |
'vbWFjLWRldi1wbGF5Ym9vawATAAEvAAAVAAIADv//AAA=' | |
)), | |
('Array4', [u'gloups', 5, 4, 3, 2, 1]), | |
('Dict4', {'a': u'ha', 'd': False, 'k': 22}), | |
('Dict4:d', True), | |
]) | |
def test_existing_key(self, preferences, key, value): | |
self.write_read(preferences, key, value) | |
@pytest.mark.parametrize('key,value', [ | |
('Bool1', u'a string'), | |
('Array1:0', u'a string'), | |
]) | |
def test_bad_type(self, preferences, key, value): | |
with pytest.raises(TypeError) as excinfo: | |
preferences.write(key, value) | |
assert 'New value type does not match current value type for key' in str(excinfo.value) | |
@pytest.mark.parametrize('key,value', BASE_VALUES + [ | |
('Array4:2', 8), | |
('Array4:2:', 10), | |
('Dict4:int4', 8), | |
(':Dict4:int4', 45), | |
('Dict4:b:', False), | |
('ComplexValue1:0:Array2:1', 22), | |
('ComplexValue2:0:Array2:0', 22), | |
]) | |
def test_nested_keys(self, preferences, key, value): | |
self.write_read(preferences, key, value) | |
def test_nonexistant_index(self, preferences): | |
with pytest.raises(IndexError) as excinfo: | |
preferences.write('Array4:20', 33) | |
assert 'out of range' in str(excinfo.value) | |
def test_complex_value(self, preferences): | |
value = [ | |
{'a': 2, 'b': [True, u'Allo', 23.34, datetime.datetime.utcnow()]}, | |
BINARY_DATA, | |
] | |
self.write_read(preferences, 'ComplexValue2', value) | |
def test_bad_complex_value(self, preferences): | |
value = [ | |
{'a': 2, 'b': [ | |
set([1, 2]), u'Allo', 23.34, datetime.datetime.utcnow() | |
]}, | |
BINARY_DATA, | |
] | |
with pytest.raises(TypeError) as excinfo: | |
preferences.write('ComplexValue2', _auto_cast_type(value)) | |
assert '` is unsupported.' in str(excinfo.value) | |
@pytest.mark.parametrize('key', [ | |
'ComplexValue1:key2:key3', | |
'ComplexValue1:0:Array2:AnotherKey', | |
]) | |
def test_node_key_type_mismatch(self, preferences, key): | |
with pytest.raises(TypeError) as excinfo: | |
preferences.write(key, 22) | |
assert 'Type mismatch between the key' in str(excinfo.value) | |
def test_bad_parent(self, preferences): | |
with pytest.raises(TypeError) as excinfo: | |
preferences.write('Data1:key', 22) | |
assert 'Type mismatch between the key' in str(excinfo.value) | |
@pytest.mark.parametrize('key,value', [ | |
('Array5', [7, 8]), | |
('Array5', [9, 10]), | |
('Array5', [11]), | |
('Array6', [10]), | |
('Array6', [1, u'2', {'a': u'b'}, [u'C', datetime.datetime.utcnow()]]), | |
('Array6', [{'a': [1, 2, u'b']}]), | |
]) | |
def test_array_add(self, preferences, key, value): | |
value = _auto_cast_type(value) | |
preferences.write(key, value, array_add=True) | |
value = self.normalize_datetime(value) | |
out = preferences.read(key) | |
if type(value) is not list: | |
value = [value] | |
out = out[len(value) * -1:] | |
assert out == value | |
def test_array_add_nonexistant_index(self, preferences): | |
with pytest.raises(IndexError) as excinfo: | |
preferences.write('ComplexValue1:4', [33, 44], array_add=True) | |
assert 'out of range' in str(excinfo.value) | |
def test_array_add_nonexistant_index_and_sublevel(self, preferences): | |
with pytest.raises(IndexError) as excinfo: | |
preferences.write('ComplexValue1:10:Array2', 22, array_add=True) | |
assert 'out of range' in str(excinfo.value) | |
@pytest.mark.parametrize('key', [ | |
'Dict1:a', | |
'Array1:1', | |
]) | |
def test_array_add_not_to_list(self, preferences, key): | |
with pytest.raises(TypeError) as excinfo: | |
preferences.write(key, 22, array_add=True) | |
assert "With array_add end node should be a list and it's not." in str(excinfo.value) | |
def test_array_add_bad_parent(self, preferences): | |
with pytest.raises(TypeError) as excinfo: | |
preferences.write('Data1:key', 22, array_add=True) | |
assert 'Type mismatch between the key' in str(excinfo.value) | |
def test_nsglobaldomain(self): | |
preferences = CFPreferences('NSGlobalDomain') | |
preferences.write('GlobalBool', True) | |
out = preferences.read('GlobalBool') | |
assert out is True | |
def test_anyuser(self): | |
preferences = CFPreferences('com.ansible.osx_defaults', any_user=True) | |
preferences.write('GlobalBool', True) | |
out = preferences.read('GlobalBool') | |
assert out is True | |
def test_nsglobaldomain_anyuser(self): | |
preferences = CFPreferences('NSGlobalDomain', any_user=True) | |
preferences.write('GlobalAnyBool', True) | |
out = preferences.read('GlobalAnyBool') | |
assert out is True | |
@pytest.fixture | |
def nonexistant_preferences(self, request): | |
domain = 'com.ansible.nonexistant' | |
preferences = CFPreferences(domain) | |
def delete(): | |
name = domain + '.plist' | |
path = os.path.expanduser('~/Library/Preferences/') | |
# This loop is needed to check when the temporary file with the new | |
# values created by the system will be replacing the real plist. | |
# Sometimes this can take many seconds. Without this sometimes the | |
# real plist is not deleted. | |
while len([n for n in os.listdir(path) if name in n]) > 1: | |
time.sleep(0.1) | |
try: | |
os.remove(os.path.join(path, name)) | |
except OSError: | |
pass | |
delete() | |
request.addfinalizer(delete) | |
return preferences | |
def test_nonexistant_domain(self, nonexistant_preferences): | |
self.write_read(nonexistant_preferences, 'Bool1', True) | |
class TestDelete: | |
def test_bad_key_type(self, preferences): | |
with pytest.raises(TypeError) as excinfo: | |
preferences.delete(22) | |
assert 'Key should be a string' in str(excinfo.value) | |
@pytest.mark.parametrize('key', [ | |
'Bool1', | |
'Int1', | |
'Float1', | |
'String1', | |
'Datetime1', | |
'Data1', | |
'Array1', | |
'Dict1', | |
]) | |
def test_basic_type(self, preferences, key): | |
preferences.delete(key) | |
out = preferences.read(key) | |
assert out is None | |
def test_nested_key(self, preferences): | |
value = [ | |
{ | |
'a': 2, | |
'b': [True, u'Allo', 23.34, datetime.datetime.utcnow()], | |
'c': False | |
}, | |
BINARY_DATA, | |
] | |
preferences.write('DeleteNested', deepcopy(value)) | |
keys = ( | |
('DeleteNested:0:b:1:Nonexistant', None), | |
('DeleteNested:0:c:BadType', None), | |
(':DeleteNested:0:b:1', 23.34), | |
('DeleteNested:0:b:', None), | |
('DeleteNested:1', None), | |
) | |
for key, value in keys: | |
preferences.delete(key) | |
out = preferences.read(key) | |
assert out == value | |
out = preferences.read('DeleteNested') | |
assert out == [{'a': 2, 'c': False}] | |
@pytest.mark.parametrize('key', [ | |
'NonexistentKey', | |
'Dict1:d', | |
'Array1:12', | |
]) | |
def test_nonexistant_key(self, preferences, key): | |
assert preferences.delete(key) is None | |
def test_from_nonexistant_domain(self): | |
preferences = CFPreferences('com.ansible.nonexistant') | |
assert preferences.delete('NonexistentKey') is None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment