Last active
August 29, 2015 14:01
-
-
Save adongy/4d70c4981a05b34a845c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import logging | |
import os | |
import tempfile | |
import time | |
from fractions import Fraction | |
import usb | |
from jpegtran import JPEGImage | |
from spreads.vendor.pathlib import Path | |
from spreads.config import OptionTemplate | |
from spreads.plugin import DevicePlugin, DeviceFeatures | |
from spreads.util import DeviceException | |
import lupa | |
WHITEBALANCE_MODES = { | |
'Auto': 0, | |
'Daylight': 1, | |
'Cloudy': 2, | |
'Tungsten': 3, | |
'Fluorescent': 4, | |
'Fluorescent H': 5, | |
'Custom': 7 | |
} | |
#Run to get a chdkptp equivalent shell | |
INIT_SCRIPT = ''' | |
require('lfs') | |
require('chdkptp') | |
require('lbuf') | |
require('rawimg') | |
require('io') | |
require('os') | |
util=require('util') | |
util:import() | |
ustime=require('ustime') | |
fsutil=require('fsutil') | |
prefs=require('prefs') | |
chdku=require('chdku') | |
cli=require('cli') | |
exp=require('exposure') | |
dng=require('dng') | |
dngcli=require('dngcli') | |
--set debug | |
prefs._add('core_verbose','number','ptp core verbosity',0, | |
function(self) | |
return corevar.get_verbose() | |
end, | |
function(self,val) | |
corevar.set_verbose(val) | |
end | |
) | |
prefs._set('core_verbose', 2) | |
prefs._set('cli_verbose', 2) | |
--global connection variable | |
con=chdku.connection() | |
dngcli.init_cli() | |
''' | |
class CHDKPTPException(Exception): | |
pass | |
class CHDKCameraDevice(DevicePlugin): | |
""" Plugin for digital cameras running the CHDK firmware. | |
""" | |
features = (DeviceFeatures.PREVIEW, DeviceFeatures.IS_CAMERA) | |
target_page = None | |
_cli_flags = None | |
_chdk_buildnum = None | |
_can_remote = False | |
_zoom_steps = 0 | |
MAX_RESOLUTION = 0 | |
MAX_QUALITY = 0 | |
@classmethod | |
def configuration_template(cls): | |
conf = super(CHDKCameraDevice, cls).configuration_template() | |
conf.update( | |
{'sensitivity': OptionTemplate(80, "The ISO sensitivity value"), | |
'shutter_speed': OptionTemplate( | |
u"1/25", "The shutter speed as a fraction"), | |
'zoom_level': OptionTemplate(3, "The default zoom level"), | |
'dpi': OptionTemplate(300, "The capturing resolution"), | |
'shoot_raw': OptionTemplate(False, "Shoot in RAW format (DNG)", | |
advanced=True), | |
'focus_distance': OptionTemplate(0, "Set focus distance"), | |
'monochrome': OptionTemplate( | |
False, "Shoot in monochrome mode (reduces file size)"), | |
'wb_mode': OptionTemplate(value=sorted(WHITEBALANCE_MODES), | |
docstring='White balance mode', | |
selectable=True, advanced=True), | |
'chdkptp_path': OptionTemplate(u"/usr/local/lib/chdkptp", | |
"Path to CHDKPTP binary/libraries", | |
advanced=True), | |
}) | |
return conf | |
@classmethod | |
def yield_devices(cls, config): | |
""" Search for usable devices, yield one at a time | |
:param config: spreads configuration | |
:type config: spreads.config.ConfigView | |
""" | |
SPECIAL_CASES = { | |
# (idVendor, idProduct): SpecialClass | |
(0x4a9, 0x31ef): QualityFix, # not r47, but has the same bug | |
(0x4a9, 0x3218): QualityFix, | |
(0x4a9, 0x3223): QualityFix, | |
(0x4a9, 0x3224): QualityFix, | |
(0x4a9, 0x3225): QualityFix, | |
(0x4a9, 0x3226): QualityFix, | |
(0x4a9, 0x3227): QualityFix, | |
(0x4a9, 0x3228): QualityFix, | |
(0x4a9, 0x3229): QualityFix, | |
(0x4a9, 0x322a): A2200, | |
(0x4a9, 0x322b): QualityFix, | |
(0x4a9, 0x322c): QualityFix, | |
} | |
for dev in usb.core.find(find_all=True): | |
cfg = dev.get_active_configuration()[(0, 0)] | |
ids = (dev.idVendor, dev.idProduct) | |
is_ptp = (hex(cfg.bInterfaceClass) == "0x6" | |
and hex(cfg.bInterfaceSubClass) == "0x1") | |
if not is_ptp: | |
continue | |
if ids in SPECIAL_CASES: | |
yield SPECIAL_CASES[ids](config, dev) | |
else: | |
yield cls(config, dev) | |
def __init__(self, config, device): | |
""" Set connection information and try to obtain target page. | |
:param config: spreads configuration | |
:type config: spreads.config.ConfigView | |
:param device: USB device to use for the object | |
:type device: `usb.core.Device <http://github.com/walac/pyusb>`_ | |
""" | |
super(CHDKCameraDevice, self).__init__(config, device) | |
chdkptp_path = Path(self.config["chdkptp_path"].get(unicode)) | |
os.environ['LUA_PATH'] = unicode(chdkptp_path / "lua" / "?.lua") | |
self._lua = lupa.LuaRuntime(unpack_returned_tuples=True) | |
#run init script | |
self._lua.execute(INIT_SCRIPTW) | |
self.logger = logging.getLogger('ChdkCamera') | |
self._usbport = (device.bus, device.address) | |
self._serial_number = (usb.util.get_string(device, 256, device.iSerialNumber) | |
.strip('\x00')) | |
self.logger.debug("Device has serial number {0}" | |
.format(self._serial_number)) | |
#connect to camera | |
self._execute_lua_camera("connect -d={1:03} -b={0:03}".format(*self._usbport)) | |
self._chdk_buildnum = self._execute_lua_camera("get_buildinfo()", | |
get_result=True)["build_revision"] | |
# PTP remote shooting is available starting from SVN r2927 | |
self._can_remote = self._chdk_buildnum >= 2927 | |
self._zoom_steps = self._execute_lua_camera("get_zoom_steps()", | |
get_result=True) | |
try: | |
self.target_page = self._get_target_page() | |
except ValueError: | |
self.target_page = None | |
# Set camera to highest quality | |
self._execute_lua_camera('exit_alt(); set_config_value(291, 0);' | |
'enter_alt();') | |
self.logger = logging.getLogger('ChdkCamera[{0}]' | |
.format(self.target_page)) | |
def connected(self): | |
"""connected is taken as the chdkptp-style connected, | |
i.e. there is an active PTP connection to the camera | |
""" | |
def match_serial(dev): | |
serial = ( | |
usb.util.get_string(dev, 256, dev.iSerialNumber) | |
.strip('\x00')) | |
return serial == self._serial_number | |
# Check if device is still attached | |
unchanged = usb.core.find(bus=self._usbport[0], | |
address=self._usbport[1], | |
custom_match=match_serial) is not None | |
if unchanged: | |
return True | |
new_device = usb.core.find(idVendor=0x04a9, # Canon vendor ID | |
custom_match=match_serial) | |
if new_device is None: | |
return False | |
self._usbport = (new_device.bus, new_device.address) | |
self._execute_lua_camera("connect -d={1:03} -b={0:03}".format(*self._usbport)) | |
return True | |
def set_target_page(self, target_page): | |
""" Set the device target page. | |
:param target_page: The target page name | |
:type target_page: unicode in (u"odd", u"even") | |
""" | |
if not target_page in (u"odd", u"even"): | |
raise ValueError('Target page must be u"odd" or u"even"') | |
tmp_handle = tempfile.mkstemp(text=True) | |
os.write(tmp_handle[0], "print({0})".format(target_page.upper())) | |
self._execute_lua_local("upload {0} \"OWN.LUA\"".format(tmp_handle[1])) | |
self.target_page = target_page | |
os.remove(tmp_handle[1]) | |
def prepare_capture(self, path): | |
shoot_monochrome = self.config['monochrome'].get(bool) | |
# Try to go into alt mode to prevent weird behaviour | |
self._execute_lua_camera("enter_alt()") | |
# Try to put into record mode | |
try: | |
self._execute_lua_local("rec") | |
except CHDKPTPException as e: | |
self.logger.debug(e) | |
self.logger.info("Camera already seems to be in recording mode") | |
self._set_zoom(int(self.config['zoom_level'].get())) | |
# Disable ND filter | |
self._execute_lua_camera("set_nd_filter(2)") | |
self._set_focus() | |
if shoot_monochrome: | |
rv = self._execute_lua_camera( | |
"capmode = require(\"capmode\")\n" | |
"return capmode.set(\"SCN_MONOCHROME\")", | |
get_result=True | |
) | |
if not rv: | |
self.logger.warn("Monochrome mode not supported on this " | |
"device, will be disabled.") | |
# Set White Balance mode | |
self._set_wb() | |
# Disable flash | |
self._execute_lua_camera( | |
"props = require(\"propcase\")\n" | |
"if(get_flash_mode()~=2) then set_prop(props.FLASH_MODE, 2) end") | |
# Set Quality | |
self._execute_lua_camera("set_prop(require('propcase').QUALITY, {0})" | |
.format(self.MAX_QUALITY)) | |
self._execute_lua_camera("set_prop(require('propcase').RESOLUTION, {0})" | |
.format(self.MAX_RESOLUTION)) | |
def finish_capture(self): | |
# Switch camera back to play mode. | |
# This will retract the lens and protect it from dust. | |
self._execute_lua_local("play") | |
def get_preview_image(self): | |
fpath = tempfile.mkstemp()[1] | |
cmd = "dumpframes -count=1 -nobm -nopal" | |
self._execute_lua_local("{0} {1}".format(cmd, fpath)) | |
with open(fpath, 'rb') as fp: | |
data = fp.read() | |
os.remove(fpath) | |
return data | |
def capture(self, path): | |
# NOTE: To obtain the "real" Canon ISO value, we multiply the | |
# "market" value from the config by 0.65. | |
# See core/shooting.c#~l150 in the CHDK source for more details | |
sensitivity = int(self.config["sensitivity"].get()) | |
shutter_speed = float(Fraction(self.config["shutter_speed"] | |
.get(unicode))) | |
shoot_raw = self.config['shoot_raw'].get(bool) | |
if self._can_remote: | |
cmd = ("remoteshoot -tv={0} -sv={1} {2} \"{3}\"" | |
.format(shutter_speed, sensitivity * 0.65, | |
"-dng" if shoot_raw else "", path)) | |
else: | |
cmd = ("shoot -tv={0} -sv={1} -dng={2} -rm -dl \"{3}\"" | |
.format(shutter_speed, sensitivity * 0.65, | |
int(shoot_raw), path)) | |
try: | |
self._execute_lua_local(cmd) | |
except CHDKPTPException as e: | |
if 'not in rec mode' in e.message: | |
self.prepare_capture(None) | |
self.capture(path) | |
else: | |
self.logger.warn("Capture command failed.") | |
raise e | |
extension = 'dng' if shoot_raw else 'jpg' | |
local_path = "{0}.{1}".format(path, extension) | |
# Set EXIF orientation | |
self.logger.debug("Setting EXIF orientation on captured image") | |
img = JPEGImage(local_path) | |
if self.target_page == 'odd': | |
img.exif_orientation = 6 # -90° | |
else: | |
img.exif_orientation = 8 # 90° | |
img.save(local_path) | |
def show_textbox(self, message): | |
messages = message.split("\n") | |
script = [ | |
'screen_width = get_gui_screen_width();', | |
'screen_height = get_gui_screen_height();', | |
'draw_rect_filled(0, 0, screen_width, screen_height, 256, 256);' | |
] | |
script.extend( | |
['draw_string(0, 0+(screen_height/10)*{0}, "{1}", 258, 256);' | |
.format(idx, msg) for idx, msg in enumerate(messages, 1)] | |
) | |
self._execute_lua_camera("\n".join(script), wait=False, get_result=False) | |
def _execute_lua_local(self, command): | |
"""execute lua command just as if we were in a chdkptp shell""" | |
command = "cli:execute([[{0}]])".format(command) | |
self.logger.debug("Input Lua: {0}".format(command)) | |
#status = True if okay, False else | |
status, output = self._lua.execute(command) | |
self.logger.debug("Lua returned: status {0}\n{1}".format(status, output)) | |
# Filter out connected message (why?) | |
output = [x for x in output if not x.startswith('connected:')] | |
# Check for possible CHDKPTP errors | |
if not status: | |
raise CHDKPTPException("\n".join(output)) | |
return output | |
def _execute_lua_camera(self, script, wait=True, get_result=False, timeout=256): | |
"""execute lua commands on the camera""" | |
if get_result and not "return" in script: | |
script = "return({0})".format(script) | |
cmd = "luar" if wait else "lua" | |
output = self._execute_lua_local("[[{0} {1}]]".format(cmd, script)) | |
if not get_result: | |
return | |
#why filter? | |
output = [x for x in output if x.find(":return:")][0] | |
return output | |
def _get_target_page(self): | |
"""determine if the camera shoots odd or even pages | |
'odd' are ALWAYS the right pages, 'even' left ones | |
""" | |
try: | |
target_page = self._execute_lua_camera('loadfile("OWN.LUA")()', get_result=True) | |
except DeviceException: | |
raise ValueError("Could not find OWN.LUA") | |
return target_page if target_page else None | |
def _set_zoom(self, level): | |
if level >= self._zoom_steps: | |
raise ValueError("Zoom level {0} exceeds the camera's range!" | |
" (max: {1})".format(level, self._zoom_steps - 1)) | |
self._execute_lua_camera("set_zoom({0})".format(level), wait=True) | |
def _acquire_focus(self): | |
""" Acquire auto focus and lock it. """ | |
self._execute_lua_camera("enter_alt()") | |
# Try to put into record mode | |
try: | |
self._execute_lua_local("rec") | |
except CHDKPTPException as e: | |
self.logger.debug(e) | |
self.logger.info("Camera already seems to be in recording mode") | |
self._set_zoom(int(self.config['zoom_level'].get())) | |
self._execute_lua_camera("set_aflock(0)") | |
self._execute_lua_camera("press('shoot_half')") | |
time.sleep(0.8) | |
self._execute_lua_camera("release('shoot_half')") | |
time.sleep(0.5) | |
return self._execute_lua_camera("get_focus()", get_result=True) | |
def _set_focus(self): | |
focus_distance = int(self.config['focus_distance'].get()) | |
self._execute_lua_camera("set_aflock(0)") | |
if focus_distance == 0: | |
return | |
self._execute_lua_camera("set_focus({0:.0f})".format(focus_distance)) | |
time.sleep(0.5) | |
self._execute_lua_camera("press('shoot_half')") | |
time.sleep(0.25) | |
self._execute_lua_camera("release('shoot_half')") | |
time.sleep(0.25) | |
self._execute_lua_camera("set_aflock(1)") | |
def _set_wb(self): | |
value = WHITEBALANCE_MODES.get(self.config['wb_mode'].get()) | |
self._execute_lua_camera("set_prop(require('propcase').WB_MODE, {0})" | |
.format(value)) | |
class A2200(CHDKCameraDevice): | |
""" Canon A2200 driver. | |
Works around some quirks of that CHDK port. | |
""" | |
MAX_RESOLUTION = 0 | |
MAX_QUALITY = 1 | |
def __init__(self, config, device): | |
super(A2200, self).__init__(config, device) | |
if self.target_page is not None: | |
self.logger = logging.getLogger( | |
'A2200Device[{0}]'.format(self.target_page)) | |
else: | |
self.logger = logging.getLogger('A2200Device') | |
def finish_capture(self): | |
# Putting the device back into play mode crashes the a2200 with | |
# chdk 1.3, this is why we stub it out here. | |
pass | |
def _set_zoom(self, level): | |
""" Set zoom level. | |
The A2200 currently has a bug, where setting the zoom level | |
directly via set_zoom crashes the camera quite frequently, so | |
we work around that by simulating button presses. | |
:param level: The zoom level to be used | |
:type level: int | |
""" | |
if level >= self._zoom_steps: | |
raise ValueError( | |
"Zoom level {0} exceeds the camera's range!" | |
" (max: {1})".format(level, self._zoom_steps - 1)) | |
zoom = self._execute_lua_camera("get_zoom()", get_result=True) | |
if zoom < level: | |
self._execute_lua_camera("while(get_zoom()<{0}) do click(\"zoom_in\") end" | |
.format(level + 1), | |
wait=True) | |
elif zoom > level: | |
self._execute_lua_camera("while(get_zoom()>{0}) " | |
"do click(\"zoom_out\") end".format(level + 1), | |
wait=True) | |
class QualityFix(CHDKCameraDevice): | |
""" Fixes a bug that prevents remote capture with the highest resolution | |
and quality from succeeding. See this CHDK forum post for more details: | |
http://chdk.setepontos.com/index.php?topic=4338.msg111318#msg111318 | |
""" | |
MAX_RESOLUTION = 0 | |
MAX_QUALITY = 1 | |
def __init__(self, config, device): | |
super(QualityFix, self).__init__(config, device) | |
if self.target_page is not None: | |
self.logger = logging.getLogger( | |
'QualityFixDevice[{0}]'.format(self.target_page)) | |
else: | |
self.logger = logging.getLogger('QualityFixDevice') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment