Created
August 7, 2020 12:46
-
-
Save yusufunlu/eb7b72312afd8999ddd1db1900ab7b2e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.artiwise.newswise.supernova.hadron; | |
import com.artiwise.newswise.supernova.client.AnaliticClient; | |
import com.artiwise.newswise.supernova.model.DTO.analytics.DocumentDto; | |
import com.artiwise.newswise.supernova.model.DTO.proxy.RuleProxy; | |
import com.artiwise.newswise.supernova.model.DTO.proxy.TemplateProxy; | |
import com.artiwise.newswise.supernova.model.DTO.proxy.TopicProxy; | |
import com.artiwise.newswise.supernova.model.enums.Operator; | |
import com.artiwise.newswise.supernova.model.stack.Document; | |
import com.artiwise.newswise.supernova.model.stack.DocumentMarked; | |
import com.artiwise.newswise.supernova.service.DateService; | |
import com.artiwise.newswise.supernova.service.RuleService; | |
import com.artiwise.newswise.supernova.util.BeanUtil; | |
import lombok.extern.slf4j.Slf4j; | |
import org.ahocorasick.trie.PayloadEmit; | |
import org.ahocorasick.trie.PayloadTrie; | |
import org.apache.commons.collections4.CollectionUtils; | |
import org.apache.commons.lang3.StringUtils; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Lookup; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.beans.factory.config.BeanDefinition; | |
import org.springframework.context.ApplicationContext; | |
import org.springframework.context.annotation.Scope; | |
import org.springframework.stereotype.Service; | |
import java.util.*; | |
import java.util.stream.Collectors; | |
@Slf4j | |
@Service | |
@Scope(BeanDefinition.SCOPE_PROTOTYPE) | |
public class FilterPipelineAhoCorasick { | |
//TODO | |
//eh cache | |
//cachable | |
//jedis | |
@Value("${hadron.prefix:sprnv_}") | |
public static String PREFIX = "sprnv_"; | |
public static String CATEGORY_PREFIX = "sprnv_category:"; | |
public static String SITE_PREFIX = "sprnv_site:"; | |
public static String SITETYPE_PREFIX = "sprnv_sitetype:"; | |
public static String LANGUAGE_PREFIX = "sprnv_language:"; | |
public static String CONTENTTYPE_PREFIX = "sprnv_contenttype:"; | |
private RuleService ruleService; | |
private AnaliticClient analiticClient; | |
private DateService dateService; | |
private TrieBuilder trieBuilder; | |
List<RuleProxy> ruleDtoList; | |
PayloadTrie<Set<ConditionRulePair>> commonPayloadTrie; | |
Map<String, Set<DocumentDto>> reportDocumentSet = new HashMap<>(); | |
ApplicationContext applicationContext; | |
@Autowired | |
public FilterPipelineAhoCorasick(RuleService ruleService, | |
AnaliticClient analiticClient, | |
DateService dateService, | |
TrieBuilder trieBuilder, | |
ApplicationContext applicationContext) { | |
this.ruleService = ruleService; | |
this.analiticClient = analiticClient; | |
this.dateService = dateService; | |
this.trieBuilder = trieBuilder; | |
this.applicationContext = applicationContext; | |
} | |
public Map<String, Set<DocumentDto>> run(List<DocumentMarked> documentList){ | |
reportDocumentSet.clear(); | |
ruleDtoList = ruleService.getRuleProxyObjects(); | |
commonPayloadTrie = trieBuilder.buildCommonRulePayloadTrie(ruleDtoList); | |
long startCreateRules = System.currentTimeMillis(); | |
commonRulesExecute(documentList); | |
List<DocumentMarked> filteredDocuments = documentList.stream().filter(DocumentMarked::isIncluded).collect(Collectors.toList()); | |
eliminateTopicAndTemplateMarks(filteredDocuments); | |
filteredDocuments.stream().forEach(documentMarked -> { | |
documentConvert(documentMarked); | |
}); | |
long finished = System.currentTimeMillis(); | |
log.debug("Check Conditions Elapsed Time {}", finished - startCreateRules); | |
return reportDocumentSet; | |
} | |
//document ustundeki her bir include rule için bağlı olduğu topicler kontrol edilir | |
//bu topic ile exclude rule topic aynı ise rule sil | |
//doc ustundeki include bir rule için aynı dökümana bağlı exclude rullardan biri ile template aynı ise bu include rule docdan kaldırılmalı | |
//case1 include rule topice ve exclude rule da aynı topice bağlı | |
//case2 include rule topice ve exclude rule da aynı template bağlı | |
//case3 include rule template ve exclude rule da aynı template bağlı | |
//case4 include rule template ve exclude rule topice bağlı bu topic de aynı template bağlı eleme olmaz | |
//case5 include rule topice ve exclude rule da başka bir topige baglı ama ikisi de aynı template baglı | |
private void eliminateTopicAndTemplateMarks(List<DocumentMarked> documentMarkedList){ | |
documentMarkedList.forEach(documentMarked -> { | |
Set<RuleProxy> topicIncludeRuleProxySet = documentMarked.getIncludeRuleProxies().stream().filter(item->item.getTopicProxy()!=null).collect(Collectors.toSet()); | |
Set<TopicProxy> topicExcludeProxySet = documentMarked.getExcludeRuleProxies().stream().filter(item->item.getTopicProxy()!=null).map(item->item.getTopicProxy()) | |
.collect(Collectors.toSet()); | |
Set<RuleProxy> templateIncludeRuleProxySet = documentMarked.getIncludeRuleProxies().stream().filter(item->item.getTemplateProxy()!=null).collect(Collectors.toSet()); | |
Set<TemplateProxy> templateExcludeProxySet = documentMarked.getExcludeRuleProxies().stream().filter(item->item.getTemplateProxy()!=null).map(item->item.getTemplateProxy()) | |
.collect(Collectors.toSet()); | |
Set<RuleProxy> workspaceIncludeRuleProxySet = documentMarked.getIncludeRuleProxies().stream().filter(item->StringUtils.isNotBlank(item.getWscode()) && item.isSystemRule()) | |
.collect(Collectors.toSet()); | |
Set<RuleProxy> workspaceExcludeRuleProxySet = documentMarked.getExcludeRuleProxies().stream().filter(item->StringUtils.isNotBlank(item.getWscode()) && item.isSystemRule()) | |
.collect(Collectors.toSet()); | |
//case1 | |
topicIncludeRuleProxySet.stream().forEach(topicRule->{ | |
boolean matched = topicExcludeProxySet.contains(topicRule); | |
if(matched == true) | |
topicRule.setTopicProxy(null); | |
}); | |
//case2 | |
topicIncludeRuleProxySet.stream().forEach(ruleProxy -> { | |
ruleProxy.getTopicProxy().getTemplateProxyList().removeAll(templateExcludeProxySet); | |
}); | |
//case3 | |
templateIncludeRuleProxySet.stream() | |
.filter(item->templateExcludeProxySet.contains(item.getTemplateProxy())).forEach(ruleProxy -> ruleProxy.setTemplateProxy(null)); | |
Set<TemplateProxy> templateProxySet = new TreeSet<>(); | |
topicExcludeProxySet.stream().forEach(item->templateProxySet.addAll(item.getTemplateProxyList())); | |
//case5 | |
topicIncludeRuleProxySet.stream().forEach(includeRule->{ | |
includeRule.getTopicProxy().getTemplateProxyList().removeAll(templateProxySet); | |
}); | |
workspaceIncludeRuleProxySet.stream().forEach(wsRule->{ | |
Set<RuleProxy> topicRulesCommonWs = topicIncludeRuleProxySet.stream().filter(topicRule-> topicRule.getWscode().equals(wsRule.getWscode())).collect(Collectors.toSet()); | |
Set<RuleProxy> temlateRulesCommonWs = templateIncludeRuleProxySet.stream().filter(topicRule-> topicRule.getWscode().equals(wsRule.getWscode())).collect(Collectors.toSet()); | |
}); | |
workspaceExcludeRuleProxySet.stream().forEach(wsRule->{ | |
Set<RuleProxy> topicRulesCommonWs = topicIncludeRuleProxySet.stream().filter(topicRule-> topicRule.getWscode().equals(wsRule.getWscode())).collect(Collectors.toSet()); | |
Set<RuleProxy> temlateRulesCommonWs = templateIncludeRuleProxySet.stream().filter(topicRule-> topicRule.getWscode().equals(wsRule.getWscode())).collect(Collectors.toSet()); | |
topicIncludeRuleProxySet.removeAll(topicRulesCommonWs); | |
templateIncludeRuleProxySet.removeAll(temlateRulesCommonWs); | |
}); | |
/* documentMarked.getIncludeRuleProxies().stream().forEach(ruleProxy -> { | |
ruleProxy.getTemplateProxyList().stream().forEach(templateProxy -> { | |
log.debug("Content: {} goes to templateId: {}", documentMarked.getDocument().getContent(), templateProxy.getId()); | |
}); | |
});*/ | |
}); | |
} | |
private void commonRulesExecute(List<DocumentMarked> documentList) { | |
for(DocumentMarked document: documentList) { | |
Collection<PayloadEmit<Set<ConditionRulePair>>> emits = commonPayloadTrie.parseText(document.getContent()); | |
Set<String> keywordSet = extractConditionValues(emits); | |
emits.stream().forEach(payloadEmit -> { | |
for(ConditionRulePair conditionRulePair: payloadEmit.getPayload()){ | |
RuleProxy ruleProxy = conditionRulePair.getRule(); | |
Boolean matchedConditions = ruleProxy.getConditionList().stream().allMatch(conditionDto -> { | |
Boolean matchedCondition = false; | |
if(conditionDto.getOperator().equals(Operator.AND)){ | |
matchedCondition = keywordSet.containsAll(conditionDto.getValues()); | |
} else if(conditionDto.getOperator().equals(Operator.OR)) { | |
matchedCondition = CollectionUtils.containsAny(keywordSet, conditionDto.getValues()); | |
} else { | |
matchedCondition = !CollectionUtils.containsAny(keywordSet, conditionDto.getValues()); | |
} | |
return matchedCondition; | |
}); | |
if(matchedConditions){ | |
if(ruleProxy.isInclude()) { | |
document.getIncludeRuleProxies().add(ruleProxy); | |
} else { | |
document.getExcludeRuleProxies().add(ruleProxy); | |
} | |
} | |
} | |
}); | |
List<String> includeRuleNames = document.getIncludeRuleProxies().stream().map(item->item.getName()).collect(Collectors.toList()); | |
if(CollectionUtils.isNotEmpty(includeRuleNames)){ | |
log.info("Document Hashtl {} hitted by rules {}", document.getDocument().getHash_tl(), Arrays.toString(includeRuleNames.toArray())); | |
} | |
} | |
} | |
private Set<String> extractConditionValues(Collection<PayloadEmit<Set<ConditionRulePair>>> emits){ | |
Set<String> keywordSet = emits.stream().map(item->{ | |
String words[] = item.getKeyword().split("\\:"); | |
if(words.length>1){ | |
return words[1]; | |
} else { | |
return words[0]; | |
} | |
}).collect(Collectors.toSet()); | |
return keywordSet; | |
} | |
private void documentConvert(DocumentMarked documentMarked) { | |
documentMarked.getIncludeRuleProxies().stream() | |
.filter(ruleProxy -> !ruleProxy.getTemplateProxyList().isEmpty()) | |
.forEach(ruleProxy -> { | |
String ruleName = ruleProxy.getName(); | |
if(ruleProxy.getTopicProxy()!=null){ | |
String topicName = ruleProxy.getTopicProxy().getName(); | |
ruleProxy.getTopicProxy().getTemplateProxyList() | |
.stream().forEach(templateProxy-> { | |
String templateName = templateProxy.getName(); | |
templateProxy.getReportCode().stream().forEach(reportCode ->{ | |
if(reportDocumentSet.containsKey(reportCode)){ | |
Set<DocumentDto> documentDtoSet = reportDocumentSet.get(reportCode); | |
Optional<DocumentDto> documentDtoOptional = documentDtoSet.stream() | |
.filter(item->item.getOid() == documentMarked.getDocument().getHash_tl()) | |
.findFirst(); | |
if(documentDtoOptional.isPresent()){ | |
DocumentDto documentDto = documentDtoOptional.get(); | |
documentDto.getCategories().getSprnv_rulenames().add(ruleName); | |
documentDto.getCategories().getSprnv_topics().add(topicName); | |
documentDto.getCategories().getSprnv_templates().add(templateName); | |
} else { | |
DocumentDto documentDto = createDocument(documentMarked); | |
documentDto.getCategories().getSprnv_rulenames().add(ruleName); | |
documentDto.getCategories().getSprnv_topics().add(topicName); | |
documentDto.getCategories().getSprnv_templates().add(templateName); | |
documentDtoSet.add(documentDto); | |
} | |
} else { | |
DocumentDto documentDto = createDocument(documentMarked); | |
documentDto.getCategories().getSprnv_rulenames().add(ruleName); | |
documentDto.getCategories().getSprnv_topics().add(topicName); | |
documentDto.getCategories().getSprnv_templates().add(templateName); | |
TreeSet<DocumentDto> documentDtoTreeSet = new TreeSet<>(); | |
documentDtoTreeSet.add(documentDto); | |
reportDocumentSet.put(reportCode,documentDtoTreeSet); | |
} | |
}); | |
}); | |
} else { | |
TemplateProxy templateProxy = ruleProxy.getTemplateProxy(); | |
String templateName = templateProxy.getName(); | |
templateProxy.getReportCode().stream().forEach(reportCode ->{ | |
if(reportDocumentSet.containsKey(reportCode)){ | |
Set<DocumentDto> documentDtoSet = reportDocumentSet.get(reportCode); | |
Optional<DocumentDto> documentDtoOptional = documentDtoSet.stream() | |
.filter(item->item.getOid() == documentMarked.getDocument().getHash_tl()) | |
.findFirst(); | |
if(documentDtoOptional.isPresent()){ | |
DocumentDto documentDto = documentDtoOptional.get(); | |
documentDto.getCategories().getSprnv_rulenames().add(ruleName); | |
documentDto.getCategories().getSprnv_templates().add(templateName); | |
} else { | |
DocumentDto documentDto = createDocument(documentMarked); | |
documentDto.getCategories().getSprnv_rulenames().add(ruleName); | |
documentDto.getCategories().getSprnv_templates().add(templateName); | |
documentDtoSet.add(documentDto); | |
} | |
} else { | |
DocumentDto documentDto = createDocument(documentMarked); | |
documentDto.getCategories().getSprnv_rulenames().add(ruleName); | |
documentDto.getCategories().getSprnv_templates().add(templateName); | |
TreeSet<DocumentDto> documentDtoTreeSet = new TreeSet<>(); | |
documentDtoTreeSet.add(documentDto); | |
reportDocumentSet.put(reportCode,documentDtoTreeSet); | |
} | |
}); | |
} | |
}); | |
} | |
private DocumentDto createDocument(DocumentMarked documentMarked){ | |
DocumentDto documentDto = new DocumentDto(); | |
Document document = documentMarked.getDocument(); | |
if(StringUtils.isNotBlank(document.getUrl_s())){ | |
documentDto.setUrl(document.getUrl_s()); | |
}else { | |
documentDto.setUrl(document.getId()); | |
} | |
if (document.getCrawl_date().equals(document.getModified_dt())){ | |
documentDto.setUpdated(false); | |
} else { | |
documentDto.setUpdated(true); | |
} | |
documentDto.setOid(document.getHash_tl()); | |
documentDto.setDate(dateService.convertGMTForAnalytics(document.getPdbate_tdt())); | |
documentDto.getContentFields().setTitle(document.getTitle()); | |
documentDto.setSim_docs_tls(documentMarked.getDocument().getSim_docs_tls()); | |
if(document.getName_s().equals("Twitter2")){ | |
documentDto.getContentFields().setDescription(document.getContent()); | |
} else { | |
documentDto.getContentFields().setContent(document.getContent()); | |
if(CollectionUtils.isNotEmpty(document.getDescription())) { | |
documentDto.getContentFields().setDescription(document.getDescription().get(0)); | |
} | |
} | |
documentDto.getCategories().setSprnv_sitename(document.getName_s()); | |
documentDto.getCategories().setSprnv_contentcategory(document.getCategory()); | |
documentDto.getCategories().setSprnv_sitetype(document.getTypecode_s()); | |
documentDto.getCategories().setSprnv_contenttype(document.getTag().get(0)); | |
documentDto.getCategories().setSprnv_language(document.getLangcode_s()); | |
documentDto.getDateFields().setSprnv_filterdate(dateService.convertGMTForAnalytics(new Date())); | |
return documentDto; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment