Created
April 2, 2019 22:06
-
-
Save kevmal/d5382e8d2b0059a179341805be2fc2d3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Need packages: | |
// System.Reactive | |
// Trill | |
open Microsoft.StreamProcessing | |
open System | |
open System.Reactive.Linq | |
type ContextSwitch = | |
{ | |
Tick : int64 | |
ProcessId : int64 | |
CpuId : int64 | |
CpuTemp : int64 | |
} | |
let dataStr = "0 1 1 120|0 3 2 121|0 5 3 124|120 2 1 123|300 1 1 122|1800 4 2 125|3540 2 1 119|3600 1 1 120" | |
let data = | |
dataStr.Split('|') | |
|> Array.filter (String.IsNullOrWhiteSpace >> not) | |
|> Array.map (fun x -> x.Trim().Trim('\r').Split ' ') | |
|> Array.map | |
(fun x -> | |
{ | |
Tick = int64 x.[0] | |
ProcessId = int64 x.[1] | |
CpuId = int64 x.[2] | |
CpuTemp = int64 x.[3] | |
} | |
) | |
let contextSwitchObservable = data.ToObservable() | |
let contextSwitchStreamEventObservable = contextSwitchObservable |> Observable.map (fun e -> StreamEvent.CreateInterval(e.Tick, e.Tick + 1L, e)) | |
let contextSwitchIngressStreamable = contextSwitchStreamEventObservable.ToStreamable(DisorderPolicy.Drop()) | |
let contextSwitchStreamable : IStreamable<Empty, ContextSwitch> = contextSwitchIngressStreamable :> _ | |
let passthroughContextSwitchStreamEventObservable = contextSwitchStreamable.ToStreamEventObservable() | |
passthroughContextSwitchStreamEventObservable.Where(fun e -> e.IsData).ForEachAsync(fun e -> Console.WriteLine(e.ToString())).Wait() | |
let contextSwitchTwoCores = contextSwitchStreamable.Where(fun p -> p.CpuId = 1L || p.CpuId = 2L) | |
let contextSwitchTwoCoresNoTemp = contextSwitchTwoCores.Select(fun e -> {| Tick = e.Tick ; ProcessId = e.ProcessId; CpuId = e.CpuId |}) | |
type Process = | |
{ | |
ProcessId : int64 | |
Name : string | |
} | |
let processes = | |
[ | |
{ProcessId = 1L; Name = "Word"} | |
{ProcessId = 2L; Name = "Internet Explorer"} | |
{ProcessId = 3L; Name = "Excel"} | |
{ProcessId = 4L; Name = "Visual Studio"} | |
{ProcessId = 5L; Name = "Outlook"} | |
] | |
let namesStream = processes.ToObservable().Select(fun e -> StreamEvent.CreateInterval(0L,10000L,e)).ToStreamable() | |
let contextSwitchWithNames = | |
contextSwitchTwoCoresNoTemp.Join(namesStream, | |
(fun e -> e.ProcessId), | |
(fun e -> e.ProcessId), | |
(fun l r -> | |
{| Tick = l.Tick; | |
ProcessId = l.ProcessId; | |
CpuId = l.CpuId; | |
Name = r.Name |})) | |
let infiniteContextSwitch = contextSwitchWithNames.AlterEventDuration(StreamEvent.InfinitySyncTime) | |
let clippedContextSwitch = infiniteContextSwitch.Multicast(fun x -> x.ClipEventDuration(infiniteContextSwitch, (fun e -> e.CpuId), (fun e -> e.CpuId))) | |
let shiftedClippedContextSwitch = clippedContextSwitch.ShiftEventLifetime(1L) | |
let timeslices = | |
//shiftedClippedContextSwitch.Join(contextSwitchWithNames, (fun e -> e.CpuId) , (fun e -> e.CpuId), (fun l r -> {|ProcessId = l.ProcessId; CpuId = l.CpuId; Name = l.Name; Timeslice = r.Tick - l.Tick|})) | |
contextSwitchWithNames | |
.AlterEventDuration(StreamEvent.InfinitySyncTime) | |
.Multicast(fun x -> | |
x.ClipEventDuration(infiniteContextSwitch, (fun e -> e.CpuId), (fun e -> e.CpuId)) | |
.ShiftEventLifetime(1L) | |
.Join(contextSwitchWithNames, (fun e -> e.CpuId) , (fun e -> e.CpuId), (fun l r -> {|ProcessId = l.ProcessId; CpuId = l.CpuId; Name = l.Name; Timeslice = r.Tick - l.Tick|})) | |
) | |
let timeslicesForProcess1Cpu1 = timeslices.Where(fun x -> x.CpuId = 1L && x.ProcessId = 1L) | |
let windowedTimeslices = timeslicesForProcess1Cpu1.AlterEventLifetime((fun origStartTime -> (1L + ((origStartTime - 1L) / 3600L)) * 3600L), 3600L) | |
let windowedTimeslicesForProcess1Cpu1 = timeslicesForProcess1Cpu1.HoppingWindowLifetime(3600L, 3600L) | |
let totalConsumptionPerPeriodForProcess1Cpu1 = windowedTimeslicesForProcess1Cpu1.Sum(fun e -> e.Timeslice) | |
let totalConsumptionPerPeriod = timeslices.GroupApply( | |
(fun e -> {| CpuId = e.CpuId; ProcessId = e.ProcessId; Name = e.Name |}), | |
(fun s -> s.HoppingWindowLifetime(3600L, 3600L).Sum(fun e -> e.Timeslice)), | |
(fun g p -> {| CpuId = g.Key.CpuId; Name = g.Key.Name; TotalTime = p |})) | |
totalConsumptionPerPeriodForProcess1Cpu1.ToStreamEventObservable().ToEnumerable() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment