Skip to content

Instantly share code, notes, and snippets.

@jborean93
Created July 20, 2021 22:54
Show Gist options
  • Save jborean93/a8c5ee3ee702395416d923fad5d21b6c to your computer and use it in GitHub Desktop.
Save jborean93/a8c5ee3ee702395416d923fad5d21b6c to your computer and use it in GitHub Desktop.
PSRP Server written in Python - Experimental
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# PYTHON_ARGCOMPLETE_OK
# Copyright: (c) 2021 Jordan Borean (@jborean93) <[email protected]>
# MIT License (see LICENSE or https://opensource.org/licenses/MIT)
import argparse
import base64
import datetime
import os
import queue
import re
import socket
import struct
import sys
import textwrap
import threading
import traceback
import typing
import uuid
from xml.etree import ElementTree
import psutil
import psrpcore
try:
import argcomplete
except ImportError:
argcomplete = None
@psrpcore.types.PSType(
[
"Microsoft.PowerShell.Commands.WriteErrorException",
"System.SystemException",
]
)
class WriteErrorException(psrpcore.types.NETException):
def __init__(self, message: str) -> None:
super().__init__(Message=message, HResult=-2146233087)
class PipeConnection:
def __init__(self, name: str) -> None:
self._name = name
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._conn: typing.Optional[socket.socket] = None
def __enter__(self) -> "PipeConnection":
if os.name != "nt":
try:
os.unlink(self._name)
except FileNotFoundError:
pass
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._sock.bind(self._name)
self._sock.listen(1)
self._conn = self._sock.accept()[0]
return self
def __exit__(self, *args: typing.Any) -> None:
if self._conn:
self._conn.close()
self._conn = None
self._sock.close()
def read(self, length: int) -> bytes:
conn = self._get_conn()
return conn.recv(length)
def send(self, data: bytes) -> None:
conn = self._get_conn()
conn.sendall(data)
def _get_conn(self) -> socket.socket:
if not self._conn:
raise Exception("Connection has not been opened")
return self._conn
class StdioConnection:
def __enter__(self) -> "StdioConnection":
return self
def __exit__(self, *args: typing.Any) -> None:
pass
def read(self, length: int) -> bytes:
return sys.stdin.buffer.read(length)
def send(self, data: bytes) -> None:
sys.stdout.buffer.write(data)
class OutOfProcTransport:
_BUFFER = 32768
def __init__(
self,
conn: typing.Union[PipeConnection, StdioConnection],
) -> None:
super().__init__()
self.runspace = RunspaceThread(psrpcore.ServerRunspacePool(), self)
self.pipelines: typing.Dict[uuid.UUID, PipelineThread] = {}
self._conn = conn
self._event_queue: queue.Queue[psrpcore.PSRPEvent] = queue.Queue()
self._write_lock = threading.Lock()
def run(self) -> None:
self.runspace.start()
buffer = []
with self._conn as conn:
while True:
data = conn.read(self._BUFFER)
if not data:
break
print(data.decode())
try:
end_idx = data.index(b"\n")
except ValueError:
buffer.append(data)
continue
raw_element = b"".join(buffer) + data[:end_idx]
buffer = [data[end_idx + 1 :]]
try:
self._process(raw_element)
except Exception as e:
err = psrpcore.types.ErrorRecord(
Exception=psrpcore.types.NETException(
Message=str(e),
StackTrace=traceback.format_exc(),
),
CategoryInfo=psrpcore.types.ErrorCategoryInfo(
Category=psrpcore.types.ErrorCategory.ReadError,
Activity="Parsing PSRP msg",
Reason="Unknown result",
TargetName=f"RunspacePool({self.runspace.runspace.runspace_pool_id!s})",
TargetType=type(self.runspace).__name__,
),
FullyQualifiedErrorId="ProcessRunspaceMessageFailure",
)
self.runspace.runspace.set_broken(err)
resp = self.runspace.runspace.data_to_send()
if resp:
self.data(resp)
break
def close_ack(
self,
pipeline_id: typing.Optional[uuid.UUID] = None,
) -> None:
self._write(ps_guid_packet("CloseAck", pipeline_id))
def command_ack(
self,
pipeline_id: typing.Optional[uuid.UUID] = None,
) -> None:
self._write(ps_guid_packet("CommandAck", pipeline_id))
def data(
self,
data: psrpcore.PSRPPayload,
) -> None:
self._write(ps_data_packet(*data))
def data_ack(
self,
pipeline_id: typing.Optional[uuid.UUID] = None,
) -> None:
self._write(ps_guid_packet("DataAck", pipeline_id))
def signal_ack(
self,
pipeline_id: typing.Optional[uuid.UUID] = None,
) -> None:
self._write(ps_guid_packet("SignalAck", pipeline_id))
def _write(self, data: bytes) -> None:
with self._write_lock:
print(data.decode())
self._conn.send(data)
def _process(self, data: bytes) -> None:
element = ElementTree.fromstring(data)
ps_guid: typing.Optional[uuid.UUID] = uuid.UUID(element.attrib["PSGuid"])
if ps_guid == uuid.UUID(int=0):
ps_guid = None
if element.tag == "Close":
self._process_close(ps_guid)
if not ps_guid:
return
elif element.tag == "Command":
self._process_command(ps_guid)
elif element.tag == "Data":
self._process_data(element.text, element.attrib.get("Stream", ""), ps_guid)
elif element.tag == "Signal":
self._process_signal(ps_guid)
def _process_close(
self,
pipeline_id: typing.Optional[uuid.UUID] = None,
) -> None:
if pipeline_id:
pipeline = self.pipelines.pop(pipeline_id)
pipeline.close()
pipeline.join()
else:
self.runspace.close()
self.runspace.join()
self.close_ack(pipeline_id)
def _process_command(
self,
pipeline_id: typing.Optional[uuid.UUID] = None,
) -> None:
if pipeline_id:
pipeline = psrpcore.ServerPipeline(self.runspace.runspace, pipeline_id)
t = self.pipelines.setdefault(pipeline_id, PipelineThread(pipeline, self))
t.start()
self.command_ack(pipeline_id)
def _process_data(
self,
data: typing.Optional[str],
stream_type: str,
pipeline_id: typing.Optional[uuid.UUID] = None,
) -> None:
psrp_data = base64.b64decode(data) if data else b""
st = psrpcore.StreamType.prompt_response if stream_type == "PromptResponse" else psrpcore.StreamType.default
self.runspace.runspace.receive_data(psrpcore.PSRPPayload(psrp_data, st, pipeline_id))
while True:
event = self.runspace.runspace.next_event()
if not event:
break
if pipeline_id:
self.pipelines[pipeline_id].event_queue.put(event)
else:
self.runspace.event_queue.put(event)
def _process_signal(
self,
pipeline_id: typing.Optional[uuid.UUID] = None,
) -> None:
if pipeline_id:
pipeline = self.pipelines[pipeline_id]
pipeline.stop()
self.signal_ack(pipeline_id)
class RunspaceThread(threading.Thread):
def __init__(
self,
runspace: psrpcore.ServerRunspacePool,
transport: OutOfProcTransport,
) -> None:
super().__init__(name="runspace")
self.runspace = runspace
self.transport = transport
self.event_queue: queue.Queue[typing.Optional[psrpcore.PSRPEvent]] = queue.Queue()
def run(self) -> None:
while True:
event = self.event_queue.get()
if not event:
return
if isinstance(event, psrpcore.InitRunspacePoolEvent):
self.transport.data_ack()
data = self.runspace.data_to_send()
if data:
self.transport.data(data)
def close(self) -> None:
self.runspace.close()
data = self.runspace.data_to_send()
if data:
self.transport.data(data)
self.event_queue.put(None)
class PipelineThread(threading.Thread):
def __init__(
self,
pipeline: psrpcore.ServerPipeline,
transport: OutOfProcTransport,
) -> None:
super().__init__(name=f"pipeline-{pipeline.pipeline_id!s}")
self.pipeline = pipeline
self.transport = transport
self.event_queue: queue.Queue[typing.Optional[psrpcore.PSRPEvent]] = queue.Queue()
self._worker: typing.Optional[threading.Thread] = None
@property
def runspace(self) -> psrpcore.ServerRunspacePool:
return self.pipeline.runspace_pool
@property
def ps_host(self) -> psrpcore.types.HostInfo:
pipeline_host = typing.cast(psrpcore.PowerShell, self.pipeline.metadata).host
if pipeline_host.UseRunspaceHost:
return typing.cast(psrpcore.types.HostInfo, self.runspace.host)
else:
return pipeline_host
def run(self) -> None:
while True:
event = self.event_queue.get()
if not event:
return
if isinstance(event, psrpcore.CreatePipelineEvent):
self.start_pwsh_pipeline(event.pipeline)
self._send_data()
def start_pwsh_pipeline(
self,
info: psrpcore.PowerShell,
) -> None:
script = textwrap.dedent(info.commands[0].command_text.lstrip("@'").lstrip('@"').rstrip("'@").rstrip('"@'))
self._worker = threading.Thread(
name=f"pipeline-{self.pipeline.pipeline_id!s}-worker",
target=self._exec,
args=(script,),
)
self._worker.run()
def _exec(self, code: str) -> None:
self.pipeline.start()
_ = self.runspace.data_to_send()
exec_globals = {
"print": self._print,
"write_output": self._write_output,
"write_error": self._write_error,
}
try:
exec(code, exec_globals)
except SystemExit:
self.pipeline.stop()
else:
self.pipeline.complete()
self._send_data()
def _print(self, msg: str) -> None:
if self.pipeline.state == psrpcore.types.PSInvocationState.Stopping:
sys.exit()
if self.ps_host.IsHostUINull:
self._write_error("Cannot write line without host UI")
return
self.pipeline.host_call(psrpcore.types.HostMethodIdentifier.WriteLine2, [msg])
self._send_data()
def _write_output(self, data: typing.Any) -> None:
if self.pipeline.state == psrpcore.types.PSInvocationState.Stopping:
sys.exit()
self.pipeline.write_output(data)
self._send_data()
def _write_error(
self,
message: str,
category: psrpcore.types.ErrorCategory = psrpcore.types.ErrorCategory.NotSpecified,
error_id: str = "Microsoft.PowerShell.Commands.WriteErrorException",
target_object: typing.Any = None,
recommended_action: typing.Optional[str] = None,
category_reason: str = "WriteErrorException",
category_target_name: typing.Optional[str] = None,
category_target_type: typing.Optional[str] = None,
) -> None:
if self.pipeline.state == psrpcore.types.PSInvocationState.Stopping:
sys.exit()
exception = WriteErrorException(message)
cat_target_name = None
cat_target_type = category_target_type if category_target_type is not None else None
if category_target_name is not None:
cat_target_name = category_target_name
if target_object is not None:
if cat_target_name is None:
cat_target_name = str(target_object)
if cat_target_type is None:
cat_target_type = type(target_object).__name__
cat_info = psrpcore.types.ErrorCategoryInfo(
Category=category,
Activity="Write-Error",
Reason=category_reason,
TargetName=cat_target_name,
TargetType=cat_target_type,
)
error_details = None
if recommended_action:
error_details = psrpcore.types.ErrorDetails(Message=message, RecomendedAction=recommended_action)
self.pipeline.write_error(
exception=exception,
category_info=cat_info,
target_object=target_object,
fully_qualified_error_id=error_id,
error_details=error_details,
)
self._send_data()
def close(self) -> None:
self.pipeline.close()
_ = self.runspace.data_to_send()
self.event_queue.put(None)
def stop(self) -> None:
self.pipeline.begin_stop()
_ = self.runspace.data_to_send()
def _send_data(self) -> None:
data = self.runspace.data_to_send()
if data:
self.transport.data(data)
def get_pipe_name() -> str:
"""Gets the default pwsh pipe name for the current process."""
pid = os.getpid()
proc = psutil.Process(pid)
process_name = proc.name()
utc_tz = datetime.timezone.utc
# psutil returns the time as a naive datetime but in the local time. Determining the FileTime from this means it
# needs to be converted to UTC and then getting the duration since EPOCH.
ct = datetime.datetime.fromtimestamp(proc.create_time()).astimezone()
td = ct.astimezone(utc_tz) - datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=utc_tz)
# Number is EPOCH in FileTime format.
start_time_ft = 116444736000000000 + ((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) * 10)
if os.name == "nt":
start_time = str(start_time_ft)
pipe_name = f"\\\\.\\pipe\\PSHost.{start_time}.{pid}.DefaultAppDomain.{process_name}"
else:
# .NET does `.ToString("X8").Substring(1, 8)`. Using X8 will strip any leading 0's from the hex which is
# replicated here.
start_time = base64.b16encode(struct.pack(">Q", start_time_ft)).decode().lstrip("0")[1:9]
tmpdir = os.environ.get("TMPDIR", "/tmp")
pipe_name = os.path.join(tmpdir, f"CoreFxPipe_PSHost.{start_time}.{pid}.None.{process_name}")
return pipe_name
def ps_data_packet(
data: bytes,
stream_type: psrpcore.StreamType = psrpcore.StreamType.default,
ps_guid: typing.Optional[uuid.UUID] = None,
) -> bytes:
"""Data packet for PSRP fragments.
This creates a data packet that is used to encode PSRP fragments when
sending to the server.
Args:
data: The PSRP fragments to encode.
stream_type: The stream type to target, Default or PromptResponse.
ps_guid: Set to `None` or a 0'd UUID to target the RunspacePool,
otherwise this should be the pipeline UUID.
Returns:
bytes: The encoded data XML packet.
"""
ps_guid = ps_guid or uuid.UUID(int=0)
stream_name = b"Default" if stream_type == psrpcore.StreamType.default else b"PromptResponse"
return b"<Data Stream='%s' PSGuid='%s'>%s</Data>\n" % (
stream_name,
str(ps_guid).lower().encode(),
base64.b64encode(data),
)
def ps_guid_packet(
element: str,
ps_guid: typing.Optional[uuid.UUID] = None,
) -> bytes:
"""Common PSGuid packet for PSRP message.
This creates a PSGuid packet that is used to signal events and stages in
the PSRP exchange. Unlike the data packet this does not contain any PSRP
fragments.
Args:
element: The element type, can be DataAck, Command, CommandAck, Close,
CloseAck, Signal, and SignalAck.
ps_guid: Set to `None` or a 0'd UUID to target the RunspacePool,
otherwise this should be the pipeline UUID.
Returns:
bytes: The encoded PSGuid packet.
"""
ps_guid = ps_guid or uuid.UUID(int=0)
return b"<%s PSGuid='%s' />\n" % (element.encode(), str(ps_guid).lower().encode())
def parse_args() -> argparse.Namespace:
"""Parse and return args."""
parser = argparse.ArgumentParser(description="Starts a Python PSRP Server.")
mode = parser.add_mutually_exclusive_group()
mode.add_argument(
"--pipe",
dest="pipe",
default="",
type=str,
help="The name of the pipe to communicate with (defaults to PowerShell default for a process).",
)
mode.add_argument(
"--stdio",
dest="stdio",
action="store_true",
help="Use stdout/stdin for communication rather than a pipe.",
)
if argcomplete:
argcomplete.autocomplete(parser)
return parser.parse_args()
def main() -> None:
print(os.getpid())
args = parse_args()
conn: typing.Union[StdioConnection, PipeConnection]
if args.stdio:
conn = StdioConnection()
else:
pipe_name = args.pipe or get_pipe_name()
conn = PipeConnection(pipe_name)
while not args.stdio:
transport = OutOfProcTransport(conn)
transport.run()
if __name__ == "__main__":
main()
@QuangLe1997
Copy link

how can I get list Share of server ?, can u help me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment