Last active
October 22, 2021 11:04
-
-
Save dnene/1995fe2566a8e282552177e93d51fb77 to your computer and use it in GitHub Desktop.
This simulates passenger and cab traffic at the pickup point at the airport
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package tech.dnene.trials4.airporttaxi | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.ReceiveChannel | |
import kotlinx.coroutines.channels.produce | |
import org.slf4j.LoggerFactory | |
import java.time.Duration | |
import java.time.LocalDateTime | |
import java.util.* | |
/* | |
* Dependencies required in build.gradle.kts | |
* | |
* dependencies { | |
* implementation(kotlin("stdlib")) | |
* implementation("org.jetbrains.kotlinx","kotlinx-coroutines-core","1.5.1") | |
* implementation("com.github.javafaker:javafaker:1.0.2") | |
* implementation("org.slf4j", "slf4j-api","1.7.31") | |
* implementation("ch.qos.logback", "logback-classic", "1.2.6") | |
* } | |
* | |
*/ | |
/** | |
* This program simulates the arrival of passengers and cabs at a pickup point at an airport. | |
* It can be used to track assignment delay (for passengers and cabs) under different assumptions of their | |
* arrival frequencies | |
* | |
* Inspired by https://medium.com/@anandpillai/uber-go-scheduling-at-bangalore-airport-a-simulation-using-go-50d94f7fafb9 | |
* | |
*/ | |
data class Passenger(val name: String, val age: Int, val address: String) | |
data class Cab(val registration: String, val driverName: String) | |
data class Assignment(val pas: Passenger, val pasTime: LocalDateTime, val cab: Cab, val cabTime: LocalDateTime) { | |
val assignmentTime = LocalDateTime.now() | |
val pasDelay = Duration.between(pasTime, assignmentTime) | |
val cabDelay = Duration.between(cabTime, assignmentTime) | |
} | |
val log = LoggerFactory.getLogger("simulate") | |
/** | |
* Faker for generating random passenger and cab instances | |
*/ | |
class Faker(val stateCode: String) { | |
private val faker = com.github.javafaker.Faker(Locale("en-IN")) | |
private fun rtoDistrict() = (1..34).random() | |
private fun alphaChars() = List(2) { ('A'..'Z').random() }.joinToString("") | |
private fun registrationNumber(state: String) = "$state-${rtoDistrict()} ${alphaChars()} ${(0..9999).random()}" | |
fun passenger() = Passenger(faker.name().fullName(), (18..78).random(), faker.address().fullAddress()) | |
fun cab() = Cab(registrationNumber(stateCode), faker.name().fullName()) | |
} | |
/** | |
** This is a generator which in turn returns a function. The inner function returns an amount of delay in seconds | |
* | |
* Allows different policies for delayed generation. Implemented just two .. one for a random delay, and one for | |
* a fixed delay. | |
*/ | |
fun rangeDelayGenerator(minSeconds: Int, maxSeconds: Int) = { -> (minSeconds * 1000L..maxSeconds * 1000L).random() } | |
fun fixedDelayGenerator(seconds: Int) = { -> (seconds * 1000L).toLong() } | |
/** | |
* This function is a producer. It implicitly creates a channel, and repeatedly calls the lambda `block` passed to it | |
* interspersed by the delay as specified by the `delayGenerator` function, and then submits the returned value from | |
* the `block` to the new channel it just created. | |
* It returns the receiving end of the channel which a consumer can then monitor | |
*/ | |
suspend fun <T> CoroutineScope.traffic(delayGenerator: () -> Long, block: () -> T) = produce { | |
while (isActive && !isClosedForSend) { | |
delay(delayGenerator()) | |
send(block()) | |
} | |
} | |
/** | |
* A producer which pulls data from one channel, and resubmits it along with timestamp into another channel which is | |
* implicitly created. Returns the receiving end of the channel which a further downstream consumer can use | |
*/ | |
suspend fun <T> CoroutineScope.queuer(incoming: ReceiveChannel<T>) = produce { | |
while (isActive && !isClosedForSend) { | |
val item = incoming.receive(); | |
val now = LocalDateTime.now() | |
log.debug("Queued $item at $now") | |
send(item to now) | |
} | |
} | |
/** | |
* This function pulls one item each from two channels, and when both are received, creates an assignment between them | |
*/ | |
suspend fun assignPassengerToCab( | |
pasQueue: ReceiveChannel<Pair<Passenger, LocalDateTime>>, | |
cabQueue: ReceiveChannel<Pair<Cab, LocalDateTime>>, | |
maxCount: Int, | |
) { | |
var processedCount = 0 | |
while (!pasQueue.isClosedForReceive && !cabQueue.isClosedForReceive && processedCount < maxCount) { | |
// wait until you get a passenger (will return existing one if already queued) | |
val pa = pasQueue.receive() | |
// wait until you get a cab (will return existing one if already queued) | |
val ca = cabQueue.receive() | |
// Create an assignment between the two | |
val a = Assignment(pa.first, pa.second, ca.first, ca.second) | |
log.debug("Assigned ${a.pas.name} -> ${a.cab.registration} with delay ${a.pasDelay}") | |
processedCount++ | |
} | |
log.debug("Maximum assignments done .. exiting") | |
// Once all the required count of assignments are done, start shutting down. First cancel both queues | |
pasQueue.cancel() | |
cabQueue.cancel() | |
} | |
/** | |
* Just a main | |
*/ | |
fun main(args: Array<String>) { | |
val faker = Faker("KA") | |
val maxAssignments = 5 | |
runBlocking { | |
// Create a producer which emits a new passenger based on specified delay policy and posts it to the channel | |
val passengerTraffic = traffic(rangeDelayGenerator(5, 20)) { faker.passenger() } | |
// Create a consumer/producer who notes a new passenger and resubmits them to another channel along with the time | |
val passengerArrivals = queuer(passengerTraffic) | |
// Create a producer which emits a new cab based on specified delay policy and posts it to the channel | |
val cabTraffic = traffic(fixedDelayGenerator(20)) { faker.cab() } | |
// Create a consumer/producer who notes a new cab and resubmits it to another channel along with the time | |
val cabArrivals = queuer(cabTraffic) | |
// Monitor queues and assign passenger to cab based on FIFO | |
assignPassengerToCab(passengerArrivals, cabArrivals, maxAssignments) | |
// The program wont exit yet, need to clean up the open channels | |
listOf(passengerTraffic, cabTraffic, passengerArrivals, cabArrivals).forEach { it.cancel() } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment