Last active
May 10, 2016 20:20
-
-
Save mauroa/ab3c54c6e710fed4dc39 to your computer and use it in GitHub Desktop.
Async Process in C#
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
using System.Runtime.InteropServices; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace System.Diagnostics | |
{ | |
public static class Extensions | |
{ | |
public static bool IsRunning (this Process process) | |
{ | |
try { | |
Process.GetProcessById (process.Id); | |
return true; | |
} catch { | |
return false; | |
} | |
} | |
public static Process GetParentProcess (this Process process) | |
{ | |
return GetParentProcess (process.Handle); | |
} | |
public static Process GetParentProcess (int processId) | |
{ | |
var process = Process.GetProcessById (processId); | |
return process.GetParentProcess (); | |
} | |
public static IObservable<string> RunAsync (this Process process, ProcessStartInfo startInfo) | |
{ | |
return process.RunAsync (startInfo, CancellationToken.None); | |
} | |
public static IObservable<string> RunAsync (this Process process, ProcessStartInfo startInfo, CancellationToken cancellationToken) | |
{ | |
if (process.IsRunning ()) { | |
throw new Exception ("The process is already running"); | |
} | |
if (startInfo.UseShellExecute) { | |
startInfo.RedirectStandardOutput = false; | |
startInfo.RedirectStandardError = false; | |
} | |
process.StartInfo = startInfo; | |
var cancellationRegistration = cancellationToken.Register (() => { | |
try { | |
process.Kill (); | |
} catch (InvalidOperationException) { | |
// If the process has already exited this could happen | |
} | |
}); | |
cancellationToken.ThrowIfCancellationRequested (); | |
var processObservable = GetProcessObservable (process, cancellationRegistration); | |
process.Start (); | |
cancellationToken.ThrowIfCancellationRequested (); | |
return processObservable; | |
} | |
public static IObservable<string> ObserveAsync (this Process process) | |
{ | |
return GetProcessObservable (process); | |
} | |
static IObservable<string> GetProcessObservable (Process process) | |
{ | |
return GetProcessObservable (process, default (CancellationTokenRegistration)); | |
} | |
static IObservable<string> GetProcessObservable (Process process, CancellationTokenRegistration cancellationRegistration) | |
{ | |
return Observable.Create<string> (observer => { | |
var disposables = new List<IDisposable>(); | |
var processSubscription = ObserveProcess (process, observer); | |
disposables.Add (processSubscription); | |
if (cancellationRegistration != default (CancellationTokenRegistration)) { | |
disposables.Add (cancellationRegistration); | |
} | |
disposables.Add (process); | |
return new CompositeDisposable (disposables); | |
}); | |
} | |
static IDisposable ObserveProcess (Process process, IObserver<string> observer) | |
{ | |
return Observable | |
.Merge (ObserveStdout (process), ObserveStderr (process), ObserveProcess (process)) | |
.ObserveOn (NewThreadScheduler.Default) | |
.Subscribe (async message => { | |
if (string.IsNullOrWhiteSpace (message)) { | |
return; | |
} | |
if (message.StartsWith ("Error:")) { | |
var error = message.Substring (message.IndexOf (":") + 1); | |
observer.OnNext ("Process Error"); | |
observer.OnError (new Exception (error)); | |
} else { | |
observer.OnNext (message); | |
if(message == "Process Exited") { | |
//Wait just in case the stdout and stderr processing is not yet completed | |
await Task.Delay (1000).ConfigureAwait (continueOnCapturedContext: false); | |
observer.OnCompleted (); | |
} | |
} | |
}, ex => { | |
observer.OnError (ex); | |
}); | |
} | |
static IObservable<string> ObserveStdout (Process process) | |
{ | |
if (process.StartInfo.UseShellExecute || !process.StartInfo.RedirectStandardOutput) { | |
return new Subject<string> (); | |
} | |
return Observable | |
.FromAsync (() => { | |
return process.StandardOutput.ReadLineAsync (); | |
}) | |
.Repeat () | |
.TakeWhile (_ => process.IsRunning ()); | |
} | |
static IObservable<string> ObserveStderr (Process process) | |
{ | |
if (process.StartInfo.UseShellExecute || !process.StartInfo.RedirectStandardError) { | |
return new Subject<string> (); | |
} | |
return Observable | |
.FromAsync (() => { | |
return process.StandardError.ReadToEndAsync (); | |
}) | |
.Where (stderr => !string.IsNullOrWhiteSpace (stderr)) | |
.Select (stderr => string.Format ("Error:{0}", stderr)); | |
} | |
static IObservable<string> ObserveProcess (Process process) | |
{ | |
return Observable | |
.FromAsync (() => { | |
return Task.Run (() => { | |
process.WaitForExit (); | |
}); | |
}) | |
.Select (_ => "Process Exited"); | |
} | |
static Process GetParentProcess (IntPtr handle) | |
{ | |
var info = new ParentProcessInfo (); | |
var returnLength = default (int); | |
var status = ParentProcessInfo.NtQueryInformationProcess (handle, 0, ref info, Marshal.SizeOf (info), out returnLength); | |
if (status != 0) { | |
return null; | |
} | |
try { | |
return Process.GetProcessById (info.InheritedFromUniqueProcessId.ToInt32 ()); | |
} catch (ArgumentException) { | |
// Process not found | |
return null; | |
} | |
} | |
} | |
[StructLayout (LayoutKind.Sequential)] | |
internal struct ParentProcessInfo | |
{ | |
// These members must match PROCESS_BASIC_INFORMATION | |
internal IntPtr Reserved1; | |
internal IntPtr PebBaseAddress; | |
internal IntPtr Reserved2_0; | |
internal IntPtr Reserved2_1; | |
internal IntPtr UniqueProcessId; | |
internal IntPtr InheritedFromUniqueProcessId; | |
[DllImport ("ntdll.dll")] | |
internal static extern int NtQueryInformationProcess (IntPtr processHandle, int processInformationClass, ref ParentProcessInfo processInformation, int processInformationLength, out int returnLength); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment