Created
July 18, 2017 00:00
-
-
Save stormxuwz/48d4332ccbdc42821ff60355636f25fb to your computer and use it in GitHub Desktop.
random forest in python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# gist only to keep record from https://www.jiqizhixin.com/articles/bb59b879-e030-400d-abbc-d9c0708266ff | |
# Random Forest Algorithm on Sonar Dataset | |
from random import seed | |
from random import randrange | |
from csv import reader | |
from math import sqrt | |
# Load a CSV file | |
def load_csv(filename): | |
dataset = list() | |
with open(filename, 'r') as file: | |
csv_reader = reader(file) | |
for row in csv_reader: | |
if not row: | |
continue | |
dataset.append(row) | |
return dataset | |
# Convert string column to float | |
def str_column_to_float(dataset, column): | |
for row in dataset: | |
row[column] = float(row[column].strip()) | |
# Convert string column to integer | |
def str_column_to_int(dataset, column): | |
class_values = [row[column] for row in dataset] | |
unique = set(class_values) | |
lookup = dict() | |
for i, value in enumerate(unique): | |
lookup[value] = i | |
for row in dataset: | |
row[column] = lookup[row[column]] | |
return lookup | |
# Split a dataset into k folds | |
def cross_validation_split(dataset, n_folds): | |
dataset_split = list() | |
dataset_copy = list(dataset) | |
fold_size = len(dataset) / n_folds | |
for i in range(n_folds): | |
fold = list() | |
while len(fold) < fold_size: | |
index = randrange(len(dataset_copy)) | |
fold.append(dataset_copy.pop(index)) | |
dataset_split.append(fold) | |
return dataset_split | |
# Calculate accuracy percentage | |
def accuracy_metric(actual, predicted): | |
correct = 0 | |
for i in range(len(actual)): | |
if actual[i] == predicted[i]: | |
correct += 1 | |
return correct / float(len(actual)) * 100.0 | |
# Evaluate an algorithm using a cross validation split | |
def evaluate_algorithm(dataset, algorithm, n_folds, *args): | |
folds = cross_validation_split(dataset, n_folds) | |
scores = list() | |
for fold in folds: | |
train_set = list(folds) | |
train_set.remove(fold) | |
train_set = sum(train_set, []) | |
test_set = list() | |
for row in fold: | |
row_copy = list(row) | |
test_set.append(row_copy) | |
row_copy[-1] = None | |
predicted = algorithm(train_set, test_set, *args) | |
actual = [row[-1] for row in fold] | |
accuracy = accuracy_metric(actual, predicted) | |
scores.append(accuracy) | |
return scores | |
# Split a dataset based on an attribute and an attribute value | |
def test_split(index, value, dataset): | |
left, right = list(), list() | |
for row in dataset: | |
if row[index] < value: | |
left.append(row) | |
else: | |
right.append(row) | |
return left, right | |
# Calculate the Gini index for a split dataset | |
def gini_index(groups, class_values): | |
gini = 0.0 | |
for class_value in class_values: | |
for group in groups: | |
size = len(group) | |
if size == 0: | |
continue | |
proportion = [row[-1] for row in group].count(class_value) / float(size) | |
gini += (proportion * (1.0 - proportion)) | |
return gini | |
# Select the best split point for a dataset | |
def get_split(dataset, n_features): | |
class_values = list(set(row[-1] for row in dataset)) | |
b_index, b_value, b_score, b_groups = 999, 999, 999, None | |
features = list() | |
while len(features) < n_features: | |
index = randrange(len(dataset[0])-1) | |
if index not in features: | |
features.append(index) | |
for index in features: | |
for row in dataset: | |
groups = test_split(index, row[index], dataset) | |
gini = gini_index(groups, class_values) | |
if gini < b_score: | |
b_index, b_value, b_score, b_groups = index, row[index], gini, groups | |
return {'index':b_index, 'value':b_value, 'groups':b_groups} | |
# Create a terminal node value | |
def to_terminal(group): | |
outcomes = [row[-1] for row in group] | |
return max(set(outcomes), key=outcomes.count) | |
# Create child splits for a node or make terminal | |
def split(node, max_depth, min_size, n_features, depth): | |
left, right = node['groups'] | |
del(node['groups']) | |
# check for a no split | |
if not left or not right: | |
node['left'] = node['right'] = to_terminal(left + right) | |
return | |
# check for max depth | |
if depth >= max_depth: | |
node['left'], node['right'] = to_terminal(left), to_terminal(right) | |
return | |
# process left child | |
if len(left) <= min_size: | |
node['left'] = to_terminal(left) | |
else: | |
node['left'] = get_split(left, n_features) | |
split(node['left'], max_depth, min_size, n_features, depth+1) | |
# process right child | |
if len(right) <= min_size: | |
node['right'] = to_terminal(right) | |
else: | |
node['right'] = get_split(right, n_features) | |
split(node['right'], max_depth, min_size, n_features, depth+1) | |
# Build a decision tree | |
def build_tree(train, max_depth, min_size, n_features): | |
root = get_split(dataset, n_features) | |
split(root, max_depth, min_size, n_features, 1) | |
return root | |
# Make a prediction with a decision tree | |
def predict(node, row): | |
if row[node['index']] < node['value']: | |
if isinstance(node['left'], dict): | |
return predict(node['left'], row) | |
else: | |
return node['left'] | |
else: | |
if isinstance(node['right'], dict): | |
return predict(node['right'], row) | |
else: | |
return node['right'] | |
# Create a random subsample from the dataset with replacement | |
def subsample(dataset, ratio): | |
sample = list() | |
n_sample = round(len(dataset) * ratio) | |
while len(sample) < n_sample: | |
index = randrange(len(dataset)) | |
sample.append(dataset[index]) | |
return sample | |
# Make a prediction with a list of bagged trees | |
def bagging_predict(trees, row): | |
predictions = [predict(tree, row) for tree in trees] | |
return max(set(predictions), key=predictions.count) | |
# Random Forest Algorithm | |
def random_forest(train, test, max_depth, min_size, sample_size, n_trees, n_features): | |
trees = list() | |
for i in range(n_trees): | |
sample = subsample(train, sample_size) | |
tree = build_tree(sample, max_depth, min_size, n_features) | |
trees.append(tree) | |
predictions = [bagging_predict(trees, row) for row in test] | |
return(predictions) | |
# Test the random forest algorithm | |
seed(1) | |
# load and prepare data | |
filename = 'sonar.all-data.csv' | |
dataset = load_csv(filename) | |
# convert string attributes to integers | |
for i in range(0, len(dataset[0])-1): | |
str_column_to_float(dataset, i) | |
# convert class column to integers | |
str_column_to_int(dataset, len(dataset[0])-1) | |
# evaluate algorithm | |
n_folds = 5 | |
max_depth = 10 | |
min_size = 1 | |
sample_size = 1.0 | |
n_features = int(sqrt(len(dataset[0])-1)) | |
for n_trees in [1, 5, 10]: | |
scores = evaluate_algorithm(dataset, random_forest, n_folds, max_depth, min_size, sample_size, n_trees, n_features) | |
print('Trees: %d' % n_trees) | |
print('Scores: %s' % scores) | |
print('Mean Accuracy: %.3f%%' % (sum(scores)/float(len(scores)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment