Created
February 22, 2018 23:08
-
-
Save therokh/84982717e5f0829edb84dca6ead1d6c8 to your computer and use it in GitHub Desktop.
Script to parse Pshitt JSON output into InfluxDB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import geohash | |
import json | |
from datetime import datetime, timezone | |
from influxdb import InfluxDBClient | |
# noinspection PyUnresolvedReferences | |
from requests.packages.urllib3.exceptions import InsecureRequestWarning | |
# noinspection PyUnresolvedReferences | |
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) | |
# Load JSON file | |
json_file = json.load(open('$FILENAME')) | |
# Var dec | |
count = 0 | |
influx_payload = [] | |
results = {} | |
failed = [] | |
# Iterate over the entries and insert into InfluxDB | |
for attempt in json_file['attempts']: | |
count += 1 | |
country_code = "" | |
country = "" | |
# GeoIP lookups are slow and we don't want to hammer the provider, so insert | |
# the result into 'results' dictionary and query that on each iteration. | |
# Check if this IP has already had a lookup | |
if attempt['src_ip'] in results: | |
# Positive match, don't do another | |
print(str(attempt['src_ip'] + " is cached")) | |
else: | |
print(str(attempt['src_ip']) + " not found, performing lookup") | |
# Do a lookup for the geo data, cache the results | |
lookup = requests.get('http://geoip.nekudo.com/api/{}'.format(attempt['src_ip'])).json() | |
results[attempt['src_ip']] = lookup | |
# If we cannot find the geolocation data for an IP, don't add it. Report at end | |
data = results.get(attempt['src_ip']) | |
if 'country' not in data: | |
print("Unable to locate data for " + str(attempt['src_ip'])) | |
failed.append([attempt['src_ip'],attempt['timestamp']]) | |
else: | |
country_code = data['country']['code'] | |
country = data['country']['name'] | |
influx_payload.append( | |
{ | |
"measurement": "login_attempt", | |
"tags": { | |
"host": "$insert_your_honeypot_hostname_here", | |
"country_code": country_code, | |
"country": country, | |
}, | |
"time": attempt['timestamp'], | |
"fields": { | |
"source_ip": attempt['src_ip'], | |
} | |
} | |
) | |
# Console output | |
print("Iteration: " + str(count)) | |
print("Timestamp: " + str(attempt['timestamp'])) | |
print("Source IP: " + str(attempt['src_ip'])) | |
print("Country: " + str(data['country']['name'])) | |
#print("City: " + str(data['city'])) | |
if (count % 1000) == 0: | |
# Write to DB every 1000 entries. The remainder will be caught at the end. | |
# This is to prevent a massive write failing midway if we are dealing with a huge import. | |
# Format: host, port, user, pw, database | |
influx = InfluxDBClient('$INFLUXDBHOST', 8086, '$DBUSER', '$DBPASSWORD', '$DB') | |
influx.write_points(influx_payload) | |
print("=== Wrote to DB ===") | |
influx_payload = [] | |
print("=======") | |
# Write any remaining items to InfluxDB | |
influx = InfluxDBClient('$INFLUXDBHOST', 8086, '$DBUSER', '$DBPASSWORD', '$DB') | |
influx.write_points(influx_payload) | |
print("Wrote to DB") | |
print("===============") | |
print("Imported IPs") | |
for key,val in results.items(): | |
print(key, "=>", val['country']) |
I run it manually. My plan for automation was to fork pshitt and have it talk to Influx directly
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
do you have this running as cron job? wouldn't it re-ingest already parse entries? thanks