Skip to content

Instantly share code, notes, and snippets.

@blink1073
Last active September 15, 2015 19:48

Revisions

  1. blink1073 revised this gist Sep 15, 2015. 1 changed file with 2 additions and 4 deletions.
    6 changes: 2 additions & 4 deletions kernelfuture.ts
    Original file line number Diff line number Diff line change
    @@ -2,10 +2,9 @@
    /**
    * Implementation of a kernel future.
    */
    class KernelFutureHandler extends DisposableDelegate implements IKernelFuture {
    class KernelFutureHandler implements IKernelFuture {

    constructor(kernel: IKernel, msgId: string, shellPromise: Promise<IKernelMessage>, callback: () => void) {
    super(callback);
    constructor(kernel: IKernel, msgId: string, shellPromise: Promise<IKernelMessage>) {
    this._msgId = msgId;
    this.autoDispose = false;
    this._kernel = kernel;
    @@ -114,7 +113,6 @@ class KernelFutureHandler extends DisposableDelegate implements IKernelFuture {
    this._iopub = null;
    this._reply = null;
    this._done = null;
    super.dispose();
    }

    _handleStdin(msg: IKernelMessage): void {
  2. blink1073 revised this gist Sep 15, 2015. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion kernelfuture.ts
    Original file line number Diff line number Diff line change
    @@ -7,7 +7,7 @@ class KernelFutureHandler extends DisposableDelegate implements IKernelFuture {
    constructor(kernel: IKernel, msgId: string, shellPromise: Promise<IKernelMessage>, callback: () => void) {
    super(callback);
    this._msgId = msgId;
    this.autoDispose = true;
    this.autoDispose = false;
    this._kernel = kernel;
    kernel.iopubReceived.connect(this._handleIOPub, this);
    kernel.stdinReceived.connect(this._handleStdin, this);
  3. blink1073 revised this gist Sep 15, 2015. 1 changed file with 12 additions and 13 deletions.
    25 changes: 12 additions & 13 deletions kernelfuture.ts
    Original file line number Diff line number Diff line change
    @@ -104,6 +104,18 @@ class KernelFutureHandler extends DisposableDelegate implements IKernelFuture {
    this._stdin = cb;
    }

    /**
    * Dispose and unregister the future.
    */
    dispose(): void {
    this._kernel.iopubReceived.disconnect(this._handleIOPub, this);
    this._kernel.stdinReceived.disconnect(this._handleStdin, this);
    this._stdin = null;
    this._iopub = null;
    this._reply = null;
    this._done = null;
    super.dispose();
    }

    _handleStdin(msg: IKernelMessage): void {
    if (!msg.parent_header) {
    @@ -134,19 +146,6 @@ class KernelFutureHandler extends DisposableDelegate implements IKernelFuture {
    }
    }

    /**
    * Dispose and unregister the future.
    */
    dispose(): void {
    this._kernel.iopubReceived.disconnect(this._handleIOPub, this);
    this._kernel.stdinReceived.disconnect(this._handleStdin, this);
    this._stdin = null;
    this._iopub = null;
    this._reply = null;
    this._done = null;
    super.dispose();
    }

    /**
    * Handle a message done status.
    */
  4. blink1073 created this gist Sep 15, 2015.
    194 changes: 194 additions & 0 deletions kernelfuture.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,194 @@

    /**
    * Implementation of a kernel future.
    */
    class KernelFutureHandler extends DisposableDelegate implements IKernelFuture {

    constructor(kernel: IKernel, msgId: string, shellPromise: Promise<IKernelMessage>, callback: () => void) {
    super(callback);
    this._msgId = msgId;
    this.autoDispose = true;
    this._kernel = kernel;
    kernel.iopubReceived.connect(this._handleIOPub, this);
    kernel.stdinReceived.connect(this._handleStdin, this);
    shellPromise.then((msg) => {
    var reply = this._reply;
    if (reply) reply(msg);
    this._setFlag(KernelFutureFlag.GotReply);
    if (this._testFlag(KernelFutureFlag.GotIdle)) {
    this._handleDone(msg);
    }
    });
    }

    /**
    * Get the current autoDispose status of the future.
    */
    get autoDispose(): boolean {
    return this._testFlag(KernelFutureFlag.AutoDispose);
    }

    /**
    * Set the current autoDispose behavior of the future.
    *
    * If True, it will self-dispose() after onDone() is called.
    */
    set autoDispose(value: boolean) {
    if (value) {
    this._setFlag(KernelFutureFlag.AutoDispose);
    } else {
    this._clearFlag(KernelFutureFlag.AutoDispose);
    }
    }

    /**
    * Check for message done state.
    */
    get isDone(): boolean {
    return this._testFlag(KernelFutureFlag.IsDone);
    }

    /**
    * Get the reply handler.
    */
    get onReply(): (msg: IKernelMessage) => void {
    return this._reply;
    }

    /**
    * Set the reply handler.
    */
    set onReply(cb: (msg: IKernelMessage) => void) {
    this._reply = cb;
    }

    /**
    * Get the iopub handler.
    */
    get onIOPub(): (msg: IKernelMessage) => void {
    return this._iopub;
    }

    /**
    * Set the iopub handler.
    */
    set onIOPub(cb: (msg: IKernelMessage) => void) {
    this._iopub = cb;
    }

    /**
    * Get the done handler.
    */
    get onDone(): (msg: IKernelMessage) => void {
    return this._done;
    }

    /**
    * Set the done handler.
    */
    set onDone(cb: (msg: IKernelMessage) => void) {
    this._done = cb;
    }

    /**
    * Get the stdin handler.
    */
    get onStdin(): (msg: IKernelMessage) => void {
    return this._stdin;
    }

    /**
    * Set the stdin handler.
    */
    set onStdin(cb: (msg: IKernelMessage) => void) {
    this._stdin = cb;
    }


    _handleStdin(msg: IKernelMessage): void {
    if (!msg.parent_header) {
    return;
    }
    if (msg.parent_header.msg_id !== this._msgId) {
    return;
    }
    var stdin = this._stdin;
    if (stdin) stdin(msg);
    }

    _handleIOPub(msg: IKernelMessage): void {
    if (!msg.parent_header) {
    return;
    }
    if (msg.parent_header.msg_id !== this._msgId) {
    return;
    }
    var iopub = this._iopub;
    if (iopub) iopub(msg);
    if (msg.header.msg_type === 'status' &&
    msg.content.execution_state === 'idle') {
    this._setFlag(KernelFutureFlag.GotIdle);
    if (this._testFlag(KernelFutureFlag.GotReply)) {
    this._handleDone(msg);
    }
    }
    }

    /**
    * Dispose and unregister the future.
    */
    dispose(): void {
    this._kernel.iopubReceived.disconnect(this._handleIOPub, this);
    this._kernel.stdinReceived.disconnect(this._handleStdin, this);
    this._stdin = null;
    this._iopub = null;
    this._reply = null;
    this._done = null;
    super.dispose();
    }

    /**
    * Handle a message done status.
    */
    private _handleDone(msg: IKernelMessage): void {
    if (this.isDone) {
    return;
    }
    this._setFlag(KernelFutureFlag.IsDone);
    var done = this._done;
    if (done) done(msg);
    this._done = null;
    if (this._testFlag(KernelFutureFlag.AutoDispose)) {
    this.dispose();
    }
    }

    /**
    * Test whether the given future flag is set.
    */
    private _testFlag(flag: KernelFutureFlag): boolean {
    return (this._status & flag) !== 0;
    }

    /**
    * Set the given future flag.
    */
    private _setFlag(flag: KernelFutureFlag): void {
    this._status |= flag;
    }

    /**
    * Clear the given future flag.
    */
    private _clearFlag(flag: KernelFutureFlag): void {
    this._status &= ~flag;
    }

    private _status = 0;
    private _msgId = '';
    private _kernel: IKernel = null;
    private _stdin: (msg: IKernelMessage) => void = null;
    private _iopub: (msg: IKernelMessage) => void = null;
    private _reply: (msg: IKernelMessage) => void = null;
    private _done: (msg: IKernelMessage) => void = null;
    }