Created
April 5, 2017 07:22
-
-
Save frisi/56b64fa6630a6ec6a69fbff9e571ac02 to your computer and use it in GitHub Desktop.
create copy of linguaplone translation folder and fix internal links and embedded images after that (see https://community.plone.org/t/translating-website-for-different-regions/3792)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from OFS.CopySupport import CopyError | |
from Products.Archetypes.Field import ReferenceField | |
from Products.Archetypes.Field import TextField | |
from Products.Archetypes.exceptions import ReferenceException | |
from Products.CMFCore.WorkflowCore import WorkflowException | |
from Products.statusmessages.interfaces import IStatusMessage | |
from plone import api | |
from z3c.form import field, button | |
from z3c.form.form import Form | |
from customer.policy import _ | |
from zope import schema | |
from zope.annotation.interfaces import IAnnotations | |
from zope.interface import Interface | |
import logging | |
import re | |
import transaction | |
logger = logging.getLogger('customer.translate') | |
class ICopyContentToLanguage(Interface): | |
target_languages = schema.List( | |
title=_(u'Target languages'), | |
description=_( | |
u'Select into which languages the translation will be made'), | |
value_type=schema.Choice( | |
title=_(u'Target languages'), | |
vocabulary='plone.app.vocabularies.SupportedContentLanguages' | |
), | |
default=[], | |
) | |
class CopyContentToLanguage(Form): | |
"""create a copy of the context for the chosen language and | |
* register translations proplery (set translation references) | |
* fix images/links to uids within text to translated content | |
based on code erral kindly supplied in this thread: | |
https://community.plone.org/t/translating-website-for-different-regions/3792/ | |
""" | |
fields = field.Fields(ICopyContentToLanguage) | |
label = _(u'Copy the contents of this objects and its subobjects ' | |
u'to the selected language/country') | |
ignoreContext = True | |
@button.buttonAndHandler(u'Copy content') | |
def copy_content_to(self, action): | |
self.skipped_items = [] | |
self.problem_items = [] | |
# {'uid-of-translation': {'fieldname': [object1, object2]}} | |
self.references_to_fix = {} | |
# dictionary of uids of articles with list of textfields | |
# containing resolveuid links | |
# {'uid-of-translation': ['text', 'otherfieldname']} | |
self.text_contains_uids = {} | |
data, errors = self.extractData() | |
if errors: | |
self.status = self.formErrorsMessage | |
return | |
target_languages = data.get('target_languages', []) | |
path = '/'.join(self.context.getPhysicalPath()) | |
logger.info('copy content of {} to language(s): {}'.format( | |
path, | |
', '.join(target_languages))) | |
api.portal.get_tool('portal_catalog') | |
brains = list(api.content.find(context=self.context, | |
sort_on='getObjPositionInParent')) | |
# sort brains by length of path to make sure we're not translating | |
# an item that has an untranslated parent | |
brains.sort(lambda x, y: cmp(len(x.getPath().split('/')), | |
len(y.getPath().split('/')))) | |
count_translated = 0 | |
for brain in brains: | |
obj = brain.getObject() | |
if obj != self.context: | |
count_translated += self.copy_content_of(obj, target_languages) | |
logger.info('translated {} items'.format(count_translated)) | |
self.fix_references() | |
self.fix_resolve_uid_links() | |
logger.info('done') | |
msg = _(u'Contents copied successfully. Created ${num} translations', | |
mapping={'num': count_translated}) | |
IStatusMessage(self.request).add(msg, type='info') | |
if self.skipped_items: | |
msg = u'Skipped {} items: {}'.format( | |
len(self.skipped_items), | |
', '.join(self.skipped_items)) | |
IStatusMessage(self.request).add(msg, type='warning') | |
logger.warn(msg) | |
if self.problem_items: | |
logger.warn('problems for {} items:\n {}'.format( | |
len(self.problem_items), | |
'\n'.join(self.problem_items))) | |
return | |
def copy_content_of(self, item, target_languages): | |
item_path = '/'.join(item.getPhysicalPath()) | |
if item.portal_type == 'FormFolder': | |
logger.warn('Skipping FormFolder ' + item_path) | |
self.skipped_items.append(item_path) | |
return 0 | |
if item.aq_parent.portal_type == 'FormFolder': | |
logger.info('Skipping FormFolder contents ' + item_path) | |
return 0 | |
count = 0 | |
logger.info('translating ' + item_path) | |
for language in target_languages: | |
# create translation | |
translation = item.getTranslation(language) | |
if translation: | |
logger.info('already translated: {}'.format( | |
'/'.join(translation.getPhysicalPath()))) | |
continue | |
try: | |
translation = item.addTranslation(language) | |
except CopyError, e: | |
logger.error( | |
'problem: translation could not be created for ' + | |
item_path) | |
logger.error(str(e)) | |
self.problem_items.append(item_path) | |
continue | |
count += 1 | |
translation_path = '/'.join(translation.getPhysicalPath()) | |
logger.info('created translation for {}: {}'.format( | |
language, translation_path)) | |
self.copy_fields(item, translation) | |
self.copy_properties(item, translation) | |
self.copy_seo_properties(item, translation) | |
self.copy_workflow(item, translation) | |
if item.id != translation.id: | |
logger.info('correct different id {} and {}'.format( | |
item_path, translation_path)) | |
try: | |
# Make sure all persistent objects have _p_jar attribute | |
# otherwhise we can get the CopyError | |
# https://docs.plone.org/4/en/develop/plone/content/rename.html | |
transaction.savepoint(optimistic=True) | |
api.content.rename(translation, item.id, True) | |
except CopyError: | |
# usually happens if obj.cb_isMoveable() | |
self.problem_items.append(translation_path) | |
logger.warning( | |
'problem: could not rename {} {} to {}'.format( | |
translation.portal_type, translation_path, | |
item.id)) | |
translation.reindexObject() | |
return count | |
def copy_fields(self, source, target): | |
target_path = '/'.join(target.getPhysicalPath()) | |
logger.info('copying fields...') | |
for field in source.Schema().fields(): | |
fieldname = field.__name__ | |
if fieldname.lower() in ['language', 'id']: | |
# skip language | |
# skip id (setting it makes it unicode which breaks catalogs) | |
logger.debug('Skipped %s' % fieldname) | |
continue | |
target_field = target.getField(fieldname, target) | |
if target_field is None: | |
logger.warn( | |
('problem: field {} not available on ' | |
'target object {}').format( | |
fieldname, target_path)) | |
self.problem_items.append(target_path) | |
continue | |
if target_field.writeable(target): | |
value = field.get(source) | |
if isinstance(value, unicode): | |
logger.info(u'unicode value! {}: {}'.format( | |
field.getName(), value)) | |
value = value.encode('utf-8') | |
if value: | |
logger.debug('Set attribute {} in {}'.format( | |
fieldname, target_path)) | |
if type(field) == TextField: | |
# no not run transforms on text | |
# keep resolveuid/adfadf/@@images/mini instead of | |
# /path/to/image/@@images/45245345435.jpg | |
value = field.getRaw(source) | |
if 'resolveuid' in value: | |
# remember this object to later change the uids to | |
# the translated objects | |
fieldnames = self.text_contains_uids.get( | |
target.UID(), []) | |
fieldnames.append(fieldname) | |
self.text_contains_uids[target.UID()] = fieldnames | |
if type(field) == ReferenceField: | |
# store references | |
# to fix them later when every content item | |
# has been translated | |
refs = self.references_to_fix.get(target.UID(), {}) | |
refs[fieldname] = [ref.UID() for ref in value] | |
self.references_to_fix[target.UID()] = refs | |
try: | |
target_field.set(target, value) | |
except ReferenceException, e: | |
logger.error('invalid references: ' + str(e)) | |
else: | |
logger.info( | |
('Not writeable. Can not set value for ' | |
'field {} in {}.').format(fieldname, target_path)) | |
def copy_workflow(self, source, target): | |
try: | |
source_state = api.content.get_state(source) | |
current_state = api.content.get_state(target) | |
except WorkflowException: | |
# no workflow used for this type, skip this step | |
return | |
if current_state != source_state: | |
logger.info('change workflow state to ' + source_state) | |
api.content.transition(target, to_state=source_state) | |
def copy_seo_properties(self, source, target): | |
source_anno = IAnnotations(source) | |
keys = [key for key in source_anno.keys() if key.startswith('pSEO_')] | |
target_anno = IAnnotations(target) | |
for key in keys: | |
val = source_anno.get(key) | |
target_anno[key] = val | |
logger.debug('seo perseo setting {}={}'.format(key, val)) | |
def copy_properties(self, source, target): | |
for prop in source.propertyMap(): | |
_id = prop['id'] | |
if _id == 'title': | |
continue | |
val = source.getProperty(_id) | |
target.manage_addProperty(prop['id'], val, prop['type']) | |
logger.debug('set property {}: {}'.format(_id, val)) | |
def fix_references(self): | |
"""we stored all items that reference others and now we | |
fix their references to point to the Translations | |
""" | |
logger.info('fixing reference fields') | |
count = 0 | |
not_translated_count = 0 | |
for item_uid, refs in self.references_to_fix.iteritems(): | |
obj = api.content.get(UID=item_uid) | |
language = obj.getLanguage() | |
path = '/'.join(obj.getPhysicalPath()) | |
for fieldname, uids in refs.iteritems(): | |
translated_uids, not_translated = self._translated_uids( | |
uids, language) | |
if not_translated: | |
not_translated_count += len(not_translated) | |
logger.warn( | |
('{}: no {} translations for field {} for these uids: ' | |
'{}').format(path, language, fieldname, | |
','.join(not_translated))) | |
count += len(translated_uids) | |
obj.getField(fieldname).set(obj, translated_uids) | |
obj.reindexObject() | |
logger.info('fixed {} references for {} items '.format( | |
count, len(self.references_to_fix))) | |
if not_translated_count: | |
logger.warn( | |
'translation missing for {} items'.format(not_translated_count) | |
) | |
def _translated_uids(self, uids, language): | |
"""looks up translations for `uids` for the given `language` | |
returns a tuple of two lists (translated_uids, uids_not_translated) | |
""" | |
not_translated = [] | |
translated_uids = [] | |
for uid in uids: | |
original = api.content.get(UID=uid) | |
if original is None: | |
# we silently ignore these, as they simply don't appear | |
# on the portal and don't lead to errors | |
continue | |
translation = original.getTranslation(language) | |
if translation is None: | |
not_translated.append('/'.join(original.getPhysicalPath())) | |
continue | |
translated_uids.append(translation.UID()) | |
return (translated_uids, not_translated) | |
def fix_resolve_uid_links(self): | |
logger.info('fixing resolveuid references in textfields') | |
uids = re.compile(r'(?<=resolveuid/)[0-9,a-f]*', re.DOTALL) | |
count_articles = 0 | |
# trick to overcome UnboundLocalError | |
# http://stackoverflow.com/a/9264811/810427 | |
count_broken = [0] | |
count_not_translated = [0] | |
count_replaced = [0] | |
for item_uid, fieldnames in self.text_contains_uids.iteritems(): | |
obj = api.content.get(UID=item_uid) | |
language = obj.getLanguage() | |
path = '/'.join(obj.getPhysicalPath()) | |
for fieldname in fieldnames: | |
field = obj.getField(fieldname) | |
text = field.getRaw(obj) | |
def replaceuid(m): | |
uid = m.group(0) | |
obj = api.content.get(UID=uid) | |
if obj is None: | |
logger.warn( | |
'broken link for {} field {} uid {}'.format( | |
path, fieldname, uid)) | |
count_broken[0] += 1 | |
return uid | |
translation = obj.getTranslation(language) | |
if translation is None: | |
logger.warn( | |
('{}: field "{}" no translation for ' | |
'uid {} ({})').format( | |
path, fieldname, uid, | |
'/'.join(obj.getPhysicalPath()))) | |
count_not_translated[0] += 1 | |
return uid | |
count_replaced[0] += 1 | |
return translation.UID() | |
new_text = re.sub(uids, replaceuid, text) | |
field.set(obj, new_text) | |
count_articles += 1 | |
obj.reindexObject() | |
logger.info('fixed {} resolveuid references for {} items '.format( | |
count_replaced[0], count_articles)) | |
if count_broken[0]: | |
logger.warn( | |
'found {} broken links (see log above)'.format( | |
count_broken[0])) | |
if count_not_translated[0]: | |
logger.warn( | |
'{} objects have no translation (see log above'.format( | |
count_not_translated[0])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment