Created
June 16, 2017 13:26
-
-
Save ryangmolina/e1c87509b6919ac8aaf3eceb315d3e5e to your computer and use it in GitHub Desktop.
A simple python implementation of Mamdani Fuzzy Logic
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def trimf(x, points): | |
pointA = points[0] | |
pointB = points[1] | |
pointC = points[2] | |
slopeAB = getSlope(pointA, 0, pointB, 1) | |
slopeBC = getSlope(pointB, 1, pointC, 0) | |
result = 0 | |
if x >= pointA and x <= pointB: | |
result = slopeAB * x + getYIntercept(pointA, 0, pointB, 1) | |
elif x >= pointB and x <= pointC: | |
result = slopeBC * x + getYIntercept(pointB, 1, pointC, 0) | |
return result | |
def trapmf(x, points): | |
pointA = points[0] | |
pointB = points[1] | |
pointC = points[2] | |
pointD = points[3] | |
slopeAB = getSlope(pointA, 0, pointB, 1) | |
slopeCD = getSlope(pointC, 1, pointD, 0) | |
yInterceptAB = getYIntercept(pointA, 0, pointB, 1) | |
yInterceptCD = getYIntercept(pointC, 1, pointD, 0) | |
result = 0 | |
if x > pointA and x < pointB: | |
result = slopeAB * x + yInterceptAB | |
elif x >= pointB and x <= pointC: | |
result = 1 | |
elif x > pointC and x < pointD: | |
result = slopeCD * x + yInterceptCD | |
return result | |
def getSlope(x1, y1, x2, y2): | |
#Avoid zero division error of vertical line for shouldered trapmf | |
try: | |
slope = (y2 - y1) / (x2 - x1) | |
except ZeroDivisionError: | |
slope = 0 | |
return slope | |
def getYIntercept(x1, y1, x2, y2): | |
m = getSlope(x1, y1, x2, y2) | |
if y1 < y2: | |
y = y2 | |
x = x2 | |
else: | |
y = y1 | |
x = x1 | |
return y - m * x | |
def getTrimfPlots(start, end, points): | |
plots = [0] * (abs(start) + abs(end)) | |
pointA = points[0] | |
pointB = points[1] | |
pointC = points[2] | |
slopeAB = getSlope(pointA, 0, pointB, 1) | |
slopeBC = getSlope(pointB, 1, pointC, 0) | |
yInterceptAB = getYIntercept(pointA, 0, pointB, 1) | |
yInterceptBC = getYIntercept(pointB, 1, pointC | |
, 0) | |
for i in range(pointA, pointB): | |
plots[i] = slopeAB * i + yInterceptAB | |
for i in range(pointB, pointC): | |
plots[i] = slopeBC * i + yInterceptBC | |
return plots | |
def getTrapmfPlots(start, end, points, shoulder=None): | |
plots = [0] * (abs(start) + abs(end)) | |
pointA = points[0] | |
pointB = points[1] | |
pointC = points[2] | |
pointD = points[3] | |
left = 0 | |
right = 0 | |
slopeAB = getSlope(pointA, 0, pointB, 1) | |
slopeCD = getSlope(pointC, 1, pointD, 0) | |
yInterceptAB = getYIntercept(pointA, 0, pointB, 1) | |
yInterceptCD = getYIntercept(pointC, 1, pointD, 0) | |
if shoulder == "left": | |
for i in range(start, pointA): | |
plots[i] = 1 | |
elif shoulder == "right": | |
for i in range(pointD, end): | |
plots[i] = 1 | |
for i in range(pointA, pointB): | |
plots[i] = slopeAB * i + yInterceptAB | |
for i in range(pointB, pointC): | |
plots[i] = 1 | |
for i in range(pointC, pointD): | |
plots[i] = slopeCD * i + yInterceptCD | |
return plots | |
def getCentroid(aggregatedPlots): | |
n = len(aggregatedPlots) | |
xAxis = list(range(n)) | |
centroidNum = 0 | |
centroidDenum = 0 | |
for i in range(n): | |
centroidNum += xAxis[i] * aggregatedPlots[i] | |
centroidDenum += aggregatedPlots[i] | |
return centroidNum / centroidDenum |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fuzzy_logic import * | |
def main(): | |
targetTemp = float(input('Enter Target Temperature: ')) | |
currentTemp = float(input('Enter Current Temperature: ')) | |
prevTemp = float(input('Enter Previous Temperature: ')) | |
prevError = targetTemp - prevTemp | |
currentError = targetTemp - currentTemp | |
error = currentError | |
errorDerivative = prevError - currentError | |
rules = evaluateRules(error, errorDerivative) | |
aggregateValues = fisAggregation(rules, | |
fuzzifyOutputCooler(), | |
fuzzifyOutputNoChange(), | |
fuzzifyOutputHeater()) | |
centroid = getCentroid(aggregateValues) | |
print(error) | |
print(errorDerivative) | |
print(centroid) | |
def evaluateRules(error, errorDerivative): | |
rules = [[0] * 3 for i in range(3)] | |
fuzzifiedErrorNeg = fuzzifyErrorNeg(error) | |
fuzzifiedErrorZero = fuzzifyErrorZero(error) | |
fuzzifiedErrorPos = fuzzifyErrorPos(error) | |
fuzzifiedErrorDotNeg = fuzzifyErrorDotNeg(errorDerivative) | |
fuzzifiedErrorDotZero = fuzzifyErrorDotZero(errorDerivative) | |
fuzzifiedErrorDotPos = fuzzifyErrorDotPos(errorDerivative) | |
# RULE 1 | |
rules[0][0] = min(fuzzifiedErrorNeg, fuzzifiedErrorDotNeg) | |
# RULE 2 | |
rules[0][1] = min(fuzzifiedErrorZero, fuzzifiedErrorDotNeg) | |
# RULE 3 | |
rules[0][2] = min(fuzzifiedErrorPos, fuzzifiedErrorDotNeg) | |
# RULE 4 | |
rules[1][0] = min(fuzzifiedErrorNeg, fuzzifiedErrorDotZero) | |
# RULE 5 | |
rules[1][1] = min(fuzzifiedErrorZero, fuzzifiedErrorDotZero) | |
# RULE 6 | |
rules[1][2] = min(fuzzifiedErrorPos, fuzzifiedErrorDotZero) | |
# RULE 7 | |
rules[2][0] = min(fuzzifiedErrorNeg, fuzzifiedErrorDotPos) | |
# RULE 8 | |
rules[2][1] = min(fuzzifiedErrorZero, fuzzifiedErrorDotPos) | |
# RULE 9 | |
rules[2][2] = min(fuzzifiedErrorPos, fuzzifiedErrorDotPos) | |
return rules | |
def fuzzifyErrorPos(error): | |
return trimf(error, [0, 5, 5]) | |
def fuzzifyErrorZero(error): | |
return trimf(error, [-5, 0, 5]) | |
def fuzzifyErrorNeg(error): | |
return trimf(error, [-5, -5, 0]) | |
def fuzzifyErrorDotPos(errorDot): | |
return trapmf(errorDot, [1, 1.5, 5, 5]) | |
def fuzzifyErrorDotZero(errorDot): | |
return trimf(errorDot, [-2, 0, 2]) | |
def fuzzifyErrorDotNeg(errorDot): | |
return trapmf(errorDot, [-5, -5, -1.5, -1]) | |
def fuzzifyOutputCooler(): | |
return getTrapmfPlots(0, 200, [0, 0, 30, 95], "left") | |
def fuzzifyOutputNoChange(): | |
return getTrimfPlots(0, 200, [90, 100, 110]) | |
def fuzzifyOutputHeater(): | |
return getTrapmfPlots(0, 200, [105, 170, 200, 200], "right") | |
def fisAggregation(rules, pcc, pcnc, pch): | |
result = [0] * 200 | |
for rule in range(len(rules)): | |
for i in range(200): | |
if rules[rule][0] > 0 and i < 95: | |
result[i] = min(rules[rule][0], pcc[i]) | |
if rules[rule][1] > 0 and i > 90 and i < 110: | |
result[i] = min(rules[rule][1], pcnc[i]) | |
if rules[rule][2] > 0 and i > 105 and i < 200: | |
result[i] = min(rules[rule][2], pch[i]) | |
return result | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fuzzy_logic import * | |
def main(): | |
occupancyFactor = float(input('Enter occupancy factor: ')) * 100 | |
averageDistance = float(input('Enter average distance: ')) * 100 | |
trafficIntensity = float(input('Enter traffic intensity factor: ')) * 100 | |
rules = evaluateRules(occupancyFactor, averageDistance, trafficIntensity) | |
outputMfs = {'vs': getVSPlots(), 's': getSPlots(), 'rs': getRSPlots(), 'm': getMPlots(), | |
'rl': getRLPlots(), 'l': getLPlots(), 'vl': getVLPlots() | |
} | |
aggregatedPlots = fisAggregation(rules, outputMfs) | |
centroid = getCentroid(aggregatedPlots) / 100 | |
print(centroid) | |
def fisAggregation(rules, outputMfs): | |
vs = outputMfs['vs'] | |
s = outputMfs['s'] | |
rs = outputMfs['rs'] | |
m = outputMfs['m'] | |
rl = outputMfs['rl'] | |
l = outputMfs['l'] | |
vl = outputMfs['vl'] | |
aggregatePlots = [0] * 100 | |
for rule in range(len(rules)): | |
for i in range(100): | |
if rules[rule][0] > 0 and i < 20: | |
aggregatePlots[i] = min(rules[rule][0], vs[i]) | |
if rules[rule][1] > 0 and i > 15 and i < 35: | |
aggregatePlots[i] = min(rules[rule][1], s[i]) | |
if rules[rule][2] > 0 and i > 30 and i < 45: | |
aggregatePlots[i] = min(rules[rule][2], rs[i]) | |
if rules[rule][3] > 0 and i > 40 and i < 60: | |
aggregatePlots[i] = min(rules[rule][3], m[i]) | |
if rules[rule][4] > 0 and i > 55 and i < 70: | |
aggregatePlots[i] = min(rules[rule][4], rl[i]) | |
if rules[rule][5] > 0 and i > 65 and i < 85: | |
aggregatePlots[i] = min(rules[rule][5], l[i]) | |
if rules[rule][6] > 0 and i > 80: | |
aggregatePlots[i] = min(rules[rule][6], vl[i]) | |
return aggregatePlots | |
def evaluateRules(occupancyFactor, averageDistance, trafficIntensity): | |
""" | |
rowSize = 27 ; rules | |
colSize = 7 ; membership functions of output variable "n" | |
""" | |
rules = [[0] * 7 for i in range(27)] | |
""" | |
Definitions | |
Input "m": occupancy factor | |
ml - low | |
mm - medium | |
mh - high | |
Input "s": average distance | |
ss - short | |
sm - medium | |
sl - long | |
Input "p": traffic intensity | |
pl - low | |
pm - medium | |
ph - high | |
""" | |
ml = fuzzifyOccupancyLow(occupancyFactor) | |
mm = fuzzifyOccupancyMedium(occupancyFactor) | |
mh = fuzzifyOccupancyHigh(occupancyFactor) | |
ss = fuzzifyAverageDistanceShort(averageDistance) | |
sm = fuzzifyAverageDistanceMedium(averageDistance) | |
sl = fuzzifyAverageDistanceLong(averageDistance) | |
pl = fuzzifyTrafficIntensityLow(trafficIntensity) | |
pm = fuzzifyTrafficIntensityMedium(trafficIntensity) | |
ph = fuzzifyTrafficIntensityHigh(trafficIntensity) | |
""" | |
MembershipOutputIndex: | |
VS - 0 | |
S - 1 | |
RS - 2 | |
... | |
VL - 6 | |
For all "n" with output VS, store it in column 0, and for S in column 1 ... | |
""" | |
# rules[ruleIndex][membershipOutputIndex] | |
rules[0][0] = min(min(ml, ss), pl) | |
rules[1][0] = min(min(mm, ss), pl) | |
rules[2][0] = min(min(mh, ss), pl) | |
rules[3][0] = min(min(ml, sm), pl) | |
rules[4][0] = min(min(mm, sm), pl) | |
rules[5][0] = min(min(mh, sm), pl) | |
rules[6][1] = min(min(ml, sl), pl) | |
rules[7][1] = min(min(mm, sl), pl) | |
rules[8][0] = min(min(mh, sl), pl) | |
rules[9][1] = min(min(ml, ss), pm) | |
rules[10][0] = min(min(mm, ss), pm) | |
rules[11][0] = min(min(mh, ss), pm) | |
rules[12][2] = min(min(ml, sm), pm) | |
rules[13][1] = min(min(mm, sm), pm) | |
rules[14][0] = min(min(mh, sm), pm) | |
rules[15][1] = min(min(ml, sl), pm) | |
rules[16][2] = min(min(mm, sl), pm) | |
rules[17][1] = min(min(mh, sl), pm) | |
rules[18][6] = min(min(ml, ss), ph) | |
rules[19][5] = min(min(mm, ss), ph) | |
rules[20][3] = min(min(mh, ss), ph) | |
rules[21][3] = min(min(ml, sm), ph) | |
rules[22][3] = min(min(mm, sm), ph) | |
rules[23][1] = min(min(mh, sm), ph) | |
rules[24][4] = min(min(ml, sl), ph) | |
rules[25][3] = min(min(mm, sl), ph) | |
rules[26][2] = min(min(mh, sl), ph) | |
return rules | |
def fuzzifyOccupancyLow(occupancyFactor): | |
return trapmf(occupancyFactor, [0, 0, 20, 40]) | |
def fuzzifyOccupancyMedium(occupancyFactor): | |
return trimf(occupancyFactor, [20, 50, 80]) | |
def fuzzifyOccupancyHigh(occupancyFactor): | |
return trapmf(occupancyFactor, [60, 80, 100, 100]) | |
def fuzzifyAverageDistanceShort(averageDistance): | |
return trapmf(averageDistance, [0, 0, 20, 40]) | |
def fuzzifyAverageDistanceMedium(averageDistance): | |
return trimf(averageDistance, [20, 50, 80]) | |
def fuzzifyAverageDistanceLong(averageDistance): | |
return trapmf(averageDistance, [60, 80, 100, 100]) | |
def fuzzifyTrafficIntensityLow(trafficIntensity): | |
return trapmf(trafficIntensity, [0, 0, 20, 40]) | |
def fuzzifyTrafficIntensityMedium(trafficIntensity): | |
return trimf(trafficIntensity, [20, 50, 80]) | |
def fuzzifyTrafficIntensityHigh(trafficIntensity): | |
return trapmf(trafficIntensity, [60, 80, 100, 100]) | |
def getVSPlots(): | |
return getTrapmfPlots(0, 100, [0, 0, 10, 20], "left") | |
def getSPlots(): | |
return getTrimfPlots(0, 100, [15, 25, 35]) | |
def getRSPlots(): | |
return getTrimfPlots(0, 100, [30, 35, 45]) | |
def getMPlots(): | |
return getTrimfPlots(0, 100, [40, 50, 60]) | |
def getRLPlots(): | |
return getTrimfPlots(0, 100, [55, 65, 70]) | |
def getLPlots(): | |
return getTrimfPlots(0, 100, [65, 75, 85]) | |
def getVLPlots(): | |
return getTrapmfPlots(0, 100, [80, 90, 100, 100], "right") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment