Created
June 8, 2019 19:46
-
-
Save izgzhen/c43f3afff3cdaf50ac683c87809f89fe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sklearn.feature_extraction import DictVectorizer | |
from tqdm import tqdm_notebook | |
from sklearn.metrics import confusion_matrix | |
from sklearn.metrics import precision_recall_fscore_support | |
from sklearn.model_selection import KFold, StratifiedKFold | |
from sklearn import svm | |
from sklearn.ensemble import RandomForestClassifier | |
from sklearn.neighbors import KNeighborsClassifier | |
from sklearn.utils import shuffle | |
import glob | |
import os | |
from msbase.utils import load_json | |
from matplotlib.pyplot import cm | |
import pandas as pd | |
import numpy as np | |
import json | |
def load_vectors(vectors_dir: str, labels): | |
DX = [] | |
DY = [] | |
DZ = [] | |
DAPKs = [] | |
for i, label in enumerate(labels): | |
vectors = json.load(open(vectors_dir + '/' + label + "-vectors.json", "r")) | |
DAPKs += [ apk for apk, v in vectors ] | |
DX += [ v for apk, v in vectors ] | |
DY += [i] * len(vectors) | |
if label == "benign": | |
DZ += [0] * len(vectors) | |
else: | |
DZ += [1] * len(vectors) | |
if isinstance(DX[0], dict): | |
v = DictVectorizer(sparse=False) | |
DX = v.fit_transform(DX) | |
feature_names = v.feature_names_ | |
else: | |
DX = np.array(DX) | |
return DX, np.array(DY), DZ, feature_names, DAPKs | |
def classify_fold(train_X, train_Y, test_X, test_Y, | |
labels, feature_names, | |
n_estimators, max_features, max_depth, report=False): | |
classifier = RandomForestClassifier(max_depth=max_depth, n_estimators=n_estimators, max_features=max_features, n_jobs=6, random_state=33) | |
classifier.fit(train_X, train_Y) | |
pred_Y = classifier.predict(test_X) | |
pred_proba_Y = classifier.predict_proba(test_X) | |
for i in range(pred_proba_Y.shape[1], len(labels)): | |
pred_proba_Y = np.insert(pred_proba_Y, i, 0, axis=1) | |
if report: | |
feature_importances = pd.DataFrame(classifier.feature_importances_, | |
index = feature_names, | |
columns=['importance']).sort_values('importance',ascending=False) | |
# precision_recall_fscore_support(test_Y, pred_Y, labels=labels) + \ | |
return (None, None, None, None, feature_importances, test_Y, pred_Y, pred_proba_Y) | |
return classifier.score(test_X, test_Y) | |
def classify(DX, DY, labels, feature_names, DAPKs, n_estimators, max_features, split_ratio, max_depth, report=False): | |
X, Y, APKs = shuffle(DX, DY, DAPKs) | |
classifier = RandomForestClassifier(max_depth=max_depth, n_estimators=n_estimators, max_features=max_features, n_jobs=6) | |
train_size = int(len(Y) * split_ratio) | |
train_X = X[:train_size] | |
train_Y = Y[:train_size] | |
classifier.fit(train_X, train_Y) | |
test_X = X[train_size:] | |
test_Y = Y[train_size:] | |
test_APKs = APKs[train_size:] | |
pred_Y = classifier.predict(test_X) | |
pred_proba_Y = classifier.predict_proba(test_X) | |
if report: | |
feature_importances = pd.DataFrame(classifier.feature_importances_, | |
index = feature_names, | |
columns=['importance']).sort_values('importance',ascending=False) | |
return precision_recall_fscore_support(test_Y, pred_Y, labels=labels) + (feature_importances, test_Y, pred_Y, pred_proba_Y, test_APKs) | |
return classifier.score(test_X, test_Y) | |
def classify_knn(DX, DY, labels, split_ratio=0.7, n_neighbors=3, report=False): | |
X, Y = shuffle(DX, DY) | |
classifier = KNeighborsClassifier(n_neighbors=n_neighbors) | |
train_size = int(len(Y) * split_ratio) | |
train_X = X[:train_size] | |
train_Y = Y[:train_size] | |
classifier.fit(train_X, train_Y) | |
test_X = X[train_size:] | |
test_Y = Y[train_size:] | |
pred_Y = classifier.predict(test_X) | |
if report: | |
return precision_recall_fscore_support(test_Y, pred_Y, labels=labels) | |
return classifier.score(test_X, test_Y) | |
def classify_svm(DX, DY, labels, kernel="rbf", split_ratio=0.7, report=False): | |
X, Y = shuffle(DX, DY) | |
classifier = svm.SVC(kernel=kernel) | |
train_size = int(len(Y) * split_ratio) | |
train_X = X[:train_size] | |
train_Y = Y[:train_size] | |
classifier.fit(train_X, train_Y) | |
test_X = X[train_size:] | |
test_Y = Y[train_size:] | |
pred_Y = classifier.predict(test_X) | |
if report: | |
return precision_recall_fscore_support(test_Y, pred_Y, labels=labels), classifier | |
return classifier.score(test_X, test_Y) | |
def matrix(DX, DY, labels): | |
_, n_feats = DX.shape | |
estimate_scores = {} | |
for n_estimators in [2, 20, 60, 80, 100, 160, 200]: | |
if n_estimators > n_feats: | |
continue | |
estimate_scores[n_estimators] = {} | |
for max_features in [2, 20, 60, 80, 100, 160, 200]: | |
if max_features > n_feats: | |
continue | |
scores = [] | |
for i in range(10): | |
scores.append(classify(DX, DY, labels=labels, n_estimators=n_estimators, | |
max_features=max_features, split_ratio=0.7)) | |
score = np.mean(scores) | |
estimate_scores[n_estimators][max_features] = score | |
color= cm.rainbow(np.linspace(0, 1, len(estimate_scores))) | |
n_estimators_map = dict(zip(estimate_scores.keys(), range(len(estimate_scores)))) | |
for n_estimators, scores in estimate_scores.items(): | |
xs, ys = zip(*scores.items()) | |
plt.plot(xs, ys, c=color[n_estimators_map[n_estimators]], label=str(n_estimators)) | |
plt.xlabel("max_features") | |
plt.ylabel("accuracy") | |
plt.legend() | |
plt.show() | |
def avg_eval(DX, DY, DAPKs, combined_labels, combined_labels_index, feature_names, | |
max_features, n_estimators, n_fold, max_depth): | |
feature_importances_s = [] | |
y_true_all = [] | |
y_pred_all = [] | |
y_pred_proba_all = [] | |
APKs_test_all = [] | |
kf = KFold(n_splits=n_fold, shuffle=True, random_state=36) | |
for train_index, test_index in kf.split(DX): | |
_, _, _, _, feature_importances, y_true, y_pred, y_pred_proba = \ | |
classify_fold(DX[train_index], DY[train_index], DX[test_index], DY[test_index], | |
combined_labels_index, feature_names, | |
max_features=max_features, n_estimators=n_estimators, | |
max_depth=max_depth, | |
report=True) | |
feature_importances_s.append(feature_importances) | |
y_true_all.append(y_true) | |
y_pred_all.append(y_pred) | |
y_pred_proba_all.append(y_pred_proba) | |
APKs_test_all += list(pd.DataFrame(DAPKs, columns=["APK"]).loc[test_index]["APK"]) | |
return None, feature_importances_s, \ | |
np.concatenate(y_true_all), np.concatenate(y_pred_all), \ | |
np.concatenate(y_pred_proba_all), APKs_test_all | |
# NOTE: Kmean isn't very good | |
# kmeans = KMeans(n_clusters=len(combined_labels)) | |
# y_pred = kmeans.fit_predict(DX) | |
# mat = confusion_matrix(DY, y_pred).T | |
# mat | |
# size_array = np.array([n for l, n in label_stat]) | |
# size_array | |
# mat = (mat / size_array) | |
# sn.heatmap(mat, | |
# xticklabels=labels, | |
# yticklabels=range(len(labels))) | |
# plt.xlabel('true label') | |
# plt.ylabel('predicted label') | |
def load_vt_stat(apks): | |
os.chdir("../..") | |
metadata_paths = [] | |
#if not gapps_only: | |
metadata_paths.extend(glob.glob("samples_metadata/*/*.test.json")) | |
# metadata_paths.extend(glob.glob("all_samples_eval/*.test.json")) | |
#label_samples = {} | |
#bin_samples = { True: [], False: [] } # is_benign | |
#vt_stat = {} | |
#vt_stat_bin = {} | |
#for metadata_path in metadata_paths: | |
# testset_json = load_json(metadata_path) | |
#if gapps_only: | |
# label = os.path.basename(metadata_path).split(".")[0] | |
# for test_data in testset_json: | |
#if not gapps_only: | |
# label = test_data['label'] | |
# if label not in label_samples: | |
# label_samples[label] = [] | |
# bin_label = test_data['label'] == "benign" | |
# label_samples[label].append(test_data) | |
#bin_samples[bin_label].append(test_data) | |
apks_is_malicious = {} | |
apks_is_malicious_major = {} | |
for metadata_path in metadata_paths: | |
for test_data in load_json(metadata_path): | |
if test_data["apk"] in apks: | |
assert "virustotal" in test_data, test_data['apk'] | |
vt_report = test_data["virustotal"] | |
assert "positives" in vt_report and "scans" in vt_report | |
#vt_frac_positives += int(vt_report["positives"]) / len(vt_report["scans"]) | |
#print(vt_report["positives"], len(vt_report["scans"])) | |
assert len(vt_report["scans"]) == vt_report["total"] | |
vt_major = vt_report["positives"] > int(len(vt_report["scans"]) * 0.5) | |
vt_exist = vt_report["positives"] >= 1 | |
apks_is_malicious[test_data['apk']] = vt_exist | |
apks_is_malicious_major[test_data['apk']] = vt_major | |
ret = [ int(apks_is_malicious[apk]) for apk in apks ] | |
ret_major = [ int(apks_is_malicious_major[apk]) for apk in apks ] | |
assert len(ret) == len(apks) | |
os.chdir("eval/ase19") | |
return ret, ret_major | |
# vt_total += 1 | |
# if vt_total > 0: | |
# vt_stat[label] = { | |
# "vt_frac": vt_frac_positives / vt_total, | |
# "vt_exist": vt_exist_positives / vt_total, | |
# "vt_major": vt_major_positives / vt_total, | |
# "vt_support": vt_total, | |
# } | |
# for label, samples in bin_samples.items(): | |
# vt_frac_positives = 0 | |
# vt_exist_positives = 0 | |
# vt_major_positives = 0 | |
# vt_total = 0 | |
# for test_data in samples: | |
# if "virustotal" in test_data: | |
# vt_report = test_data["virustotal"] | |
# if "positives" in vt_report and "scans" in vt_report: | |
# vt_frac_positives += int(vt_report["positives"]) / len(vt_report["scans"]) | |
# vt_exist_positives += int(vt_report["positives"] > 1) | |
# vt_major_positives += int(vt_report["positives"] > len(vt_report["scans"]) * 0.5) | |
# vt_total += 1 | |
# if vt_total > 0: | |
# vt_stat_bin[label] = { | |
# "vt_frac": vt_frac_positives / vt_total, | |
# "vt_exist": vt_exist_positives / vt_total, | |
# "vt_major": vt_major_positives / vt_total, | |
# "vt_support": vt_total, | |
# } | |
# vt_result_df = pd.DataFrame(vt_stat).T | |
# vt_stat_bin_df = pd.DataFrame(vt_stat_bin).T | |
# vt_stat_bin_df = vt_stat_bin_df.rename(index={True: "benign", False: "malicious"}).drop(["vt_frac", "vt_major"], axis=1) | |
# return vt_result_df, vt_stat_bin_df | |
# FIXME: PCA is not good | |
# from sklearn.decomposition import PCA | |
# from mpl_toolkits.mplot3d import Axes3D | |
# pca = PCA(n_components=3) | |
# pca_2 = PCA(n_components=2) | |
# components = pca.fit_transform(DX) | |
# components_2 = pca_2.fit_transform(DX) | |
# result = pd.DataFrame(components, columns=['PCA%i' % i for i in range(3)]) | |
# print(result.shape) | |
# result_2 = pd.DataFrame(components_2, columns=['PCA%i' % i for i in range(2)]) | |
# print(result_2.shape) | |
# def plot(color_map, DY, labels): | |
# colors = [color_map[y] for y in DY] | |
## Plot initialisation | |
# fig = plt.figure(figsize=(8, 6)) | |
# ax = Axes3D(fig) | |
# ax.scatter(result['PCA0'], result['PCA1'], result['PCA2'], c=colors, cmap="Set2_r", s=60) | |
## make simple, bare axis lines through space: | |
# xAxisLine = ((min(result['PCA0']), max(result['PCA0'])), (0, 0), (0,0)) | |
# ax.plot(xAxisLine[0], xAxisLine[1], xAxisLine[2], 'r') | |
# yAxisLine = ((0, 0), (min(result['PCA1']), max(result['PCA1'])), (0,0)) | |
# ax.plot(yAxisLine[0], yAxisLine[1], yAxisLine[2], 'r') | |
# zAxisLine = ((0, 0), (0,0), (min(result['PCA2']), max(result['PCA2']))) | |
# ax.plot(zAxisLine[0], zAxisLine[1], zAxisLine[2], 'r') | |
## label the axes | |
# ax.set_xlabel("PC1") | |
# ax.set_ylabel("PC2") | |
# ax.set_zlabel("PC3") | |
# markers = [plt.Line2D([0,0],[0,0], color=color, marker='o', linestyle='') for color in color_map] | |
# plt.legend(markers, labels, numpoints=1) | |
# color_map = cm.rainbow(np.linspace(0, 1, len(combined_labels))) | |
# plot(color_map, DY, combined_labels) | |
# color_map_2 = cm.rainbow(np.linspace(0, 1, 2)) | |
# plot(color_map_2, DZ, ["benign", "malicous"]) | |
# DX_pca = result | |
# DX_pca.shape | |
# results, classifier = classify_svm(result_2, DZ, labels=[0,1], report=True) | |
# results | |
# plt.figure(1, figsize=(4, 3)) | |
# colors_2 = [color_map_2[y] for y in DZ] | |
# plt.scatter(result_2['PCA0'], result['PCA1'], c=colors_2, zorder=10, cmap=plt.cm.Paired, | |
# edgecolors='k') | |
# plt.scatter(classifier.support_vectors_[:, 0], classifier.support_vectors_[:, 1], s=80, | |
# facecolors='none', zorder=10, edgecolors='k') | |
# plt.axis('tight') | |
# x_min = -4 | |
# x_max = 4 | |
# y_min = -4 | |
# y_max = 4 | |
# XX, YY = np.mgrid[x_min:x_max:200j, y_min:y_max:200j] | |
# Z = classifier.decision_function(np.c_[XX.ravel(), YY.ravel()]) | |
# Put the result into a color plot | |
# Z = Z.reshape(XX.shape) | |
# plt.figure(1, figsize=(4, 3)) | |
# plt.pcolormesh(XX, YY, Z > 0, cmap=plt.cm.Paired) | |
# plt.contour(XX, YY, Z, colors=['k', 'k', 'k'], linestyles=['--', '-', '--'], | |
# levels=[-1, -.5, 0, .5, 1]) | |
# plt.xlim(x_min, x_max) | |
# plt.ylim(y_min, y_max) | |
# plt.xticks(()) | |
# plt.yticks(()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment