Last active
July 31, 2018 09:22
-
-
Save sudhirpandey/49890b9f1eee28be3cd10d42b24684bf to your computer and use it in GitHub Desktop.
Spark codes for data in elastic search
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkConf, SparkContext | |
from collections import namedtuple | |
import re | |
conf = SparkConf().setAppName("ESTest") | |
sc = SparkContext(conf=conf) | |
es_read_conf = { | |
"es.nodes" : "es-server", | |
"es.port" : "19200", | |
"es.resource" : "paasapp-2018.07.31/access_log", | |
# "es.query": '{ "range":{ "@timestamp" : {"gte" : "now-15m", "lte" : "now" }}}' | |
"es.query": "query.json" | |
} | |
es_rdd = sc.newAPIHadoopRDD( | |
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", | |
keyClass="org.apache.hadoop.io.NullWritable", | |
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", | |
conf=es_read_conf) | |
def botpresent(item): | |
regex="http?:\/\/.+?\/([a-zA-Z]+)\/?.*" | |
if "BOT: true" in item[1]["message"]: | |
return [("BOT",1)] | |
elif "BOT: false" in item[1]["message"]: | |
return [("NONBOT",1)] | |
elif "FetchError:" in item[1]["message"]: | |
matches=re.findall(regex,item[1]["message"]) | |
return [(matches[0]+"-fetcherror", 1)] | |
elif "No hit:" in item[1]["message"]: | |
return [ ("NoHIT", 1)] | |
elif "catching state when error" in item[1]["message"]: | |
return [ ("stateError", 1)] | |
else: | |
# this for finding out works that will make give of clue of the other kind of error message | |
#words=item[1]["message"].split() | |
#wordlist=map(lambda x:(x,1),words) | |
#wordlist.append(("Unclassified", 1)) | |
print ( item[1]["message"].encode('utf-8').strip() ) | |
return[("Unclassified", 1)] | |
#return wordlist | |
def countsignificant(item): | |
words=item[1]["message"].split() | |
return words | |
#for debug | |
#print(es_rdd.first()) | |
#var=es_rdd.first() | |
#for key, val in var[1].iteritems(): | |
#print key | |
# print(var[1]["message"]) | |
docs = es_rdd.map(lambda item: item[0]) | |
# get the count of BOT,NONBOT and other | |
reqbotclassification_rdd = es_rdd.flatMap(botpresent) | |
reqbotclassification=reqbotclassification_rdd.reduceByKey(lambda a,b:a+b).sortBy(lambda item: -item[1]) | |
print('Total number of docs: %s' % docs.count()) | |
results = reqbotclassification.collect() | |
for result in results: | |
print(result) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkConf, SparkContext | |
from collections import namedtuple | |
conf = SparkConf().setAppName("ESTest") | |
sc = SparkContext(conf=conf) | |
Access = namedtuple("Access", ["country", "os"]) | |
es_read_conf = { | |
"es.nodes" : "servername", | |
"es.port" : "9200", | |
"es.resource" : "index/syslog" | |
} | |
es_rdd = sc.newAPIHadoopRDD( | |
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", | |
keyClass="org.apache.hadoop.io.NullWritable", | |
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", | |
conf=es_read_conf) | |
def keytuple(item): | |
#record=Access(country=item['geoip']['country_name'],os=item['os']) | |
print(item) | |
return item[1]["geoip"]["country_name" ]+','+item[1]["os"] | |
#for debug | |
#print(es_rdd.first()) | |
#var=es_rdd.first() | |
#for key, val in var[1].iteritems(): | |
#print key | |
#print(var[1]["os"]) | |
docs = es_rdd.map(lambda item: item[0]) | |
countries = es_rdd.map(lambda item: item[1]["geoip"]["country_name"] if "geoip" in item[1] else None) | |
country_os = es_rdd.map(lambda item: item[1]["geoip"]["country_name"]+'-'+item[1]["os"] if all ( k in item[1] for k in ("geoip","os")) else None) | |
#sort in descending order | |
country_total_hit = countries.map( lambda x:(x, 1)).reduceByKey(lambda a, b: a+b).sortBy(lambda item: -item[1]) | |
#sort out countries with platform | |
country_os_stat = country_os.map( lambda x:(x, 1)).reduceByKey(lambda a, b: a+b).sortBy(lambda item: -item[1]) | |
#Print out total docs | |
print('Total number of docs: %s' % docs.count()) | |
#Print out docs without country info | |
print('Total number of docs with CountryInfo: %s' % countries.count()) | |
#Print distinct countries count | |
print('Number from countries %s' % countries.distinct().count()) | |
#Print Top ten countries that are requesting count | |
print('Top ten countries making request') | |
results = country_total_hit.collect() | |
for result in results[:10]: | |
print(result) | |
print('Top ten countries making request') | |
results = country_os_stat.collect() | |
for result in results[:10]: | |
print(result) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"filtered": { | |
"query": { | |
"multi_match": { | |
"query": "URL", | |
"fields": ["message"] | |
} | |
}, | |
"filter": { | |
"bool": { | |
"must": [ | |
{ | |
"range": { | |
"@timestamp": { | |
"gte": "now-1h", | |
"lte": "now" | |
} | |
} | |
}, | |
{ | |
"match": { | |
"kubernetes.labels.appname": "appname" | |
} | |
} | |
], | |
"must_not":[ | |
{ | |
"match": { | |
"message": "U2" | |
} | |
} | |
] | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment