Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save egordm/f38b11143d5b1aa5f2680f32981ac6f4 to your computer and use it in GitHub Desktop.
Save egordm/f38b11143d5b1aa5f2680f32981ac6f4 to your computer and use it in GitHub Desktop.
Luminis Blog — Deploying SageMaker Pipelines Using CDK
# Define the pipeline (this step uploads required code and packages by the pipeline to S3)
pipeline = pipeline_factory.create(
pipeline_name=pipeline_name,
role=sm_execution_role_arn,
sm_session=sagemaker_session,
)
pipeline_def_json = json.dumps(json.loads(pipeline.definition()), indent=2, sort_keys=True)
def create_pipeline_resource(
self,
pipeline_name: str,
pipeline_factory: SagemakerPipelineFactory,
sources_bucket_name: str,
sm_execution_role_arn: str,
) -> Tuple[sm.CfnPipeline, str]:
...
# Define the pipeline (this step uploads required code and packages by the pipeline to S3)
...
# Define CloudFormation resource for the pipeline, so it can be deployed to your account
pipeline_cfn = sm.CfnPipeline(
self,
id=f"SagemakerPipeline-{pipeline_name}",
pipeline_name=pipeline_name,
pipeline_definition={"PipelineDefinitionBody": pipeline_def_json},
role_arn=sm_execution_role_arn,
)
arn = self.format_arn(
service='sagemaker',
resource='pipeline',
resource_name=pipeline_cfn.pipeline_name,
)
return pipeline_cfn, arn
# Load infrastructure stack outputs as value parameters (resolved at cdk deploy time)
sources_bucket_name = ssm.StringParameter.value_from_lookup(
self, f"/{self.prefix}/SourcesBucketName")
sm_execution_role_arn = ssm.StringParameter.value_from_lookup(
self, f"/{self.prefix}/SagemakerExecutionRoleArn")
# Create a configured pipeline
self.example_pipeline, self.example_pipeline_arn = self.create_pipeline_resource(
pipeline_name='example-pipeline',
pipeline_factory=ExamplePipeline(
pipeline_config_parameter="Hello world!"
),
sources_bucket_name=sources_bucket_name,
sm_execution_role_arn=sm_execution_role_arn,
)
class SagemakerPipelineFactory(BaseModel):
"""Base class for all pipeline factories."""
@abstractmethod
def create(
self,
role: str,
pipeline_name: str,
sm_session: sagemaker.Session,
) -> Pipeline:
raise NotImplementedError
class ExamplePipeline(SagemakerPipelineFactory):
pipeline_config_parameter: str
def create(
self,
role: str,
pipeline_name: str,
sm_session: sagemaker.Session,
) -> Pipeline:
...
...
# Use the SKLearn image provided by AWS SageMaker
image_uri = sagemaker.image_uris.retrieve(
framework="sklearn",
region=sm_session.boto_region_name,
version="0.23-1",
)
# Create a ScriptProcessor and add code / run parameters
processor = ScriptProcessor(
image_uri=image_uri,
command=["python3"],
instance_type=instance_type_var,
instance_count=1,
role=role,
sagemaker_session=sm_session,
)
processing_step = ProcessingStep(
name="processing-example",
step_args=processor.run(
code="pipelines/sources/example_pipeline/evaluate.py",
),
job_arguments=[
"--config_parameter", self.pipeline_config_parameter
],
inputs=[],
outputs=[]
)
return Pipeline(
name=pipeline_name,
steps=[processing_step],
sagemaker_session=sm_session,
parameters=[instance_type_var],
)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--config_parameter", type=str)
args = parser.parse_args()
print(f"Hello {args.config_parameter}!")
cd ./data_project
cdk deploy
cd ./infrastructure_project
cdk deploy
role = iam.Role(
self, 'SagemakerExecutionRole',
assumed_by=iam.ServicePrincipal('sagemaker.amazonaws.com'),
role_name=f"{self.prefix}-sm-execution-role",
managed_policies=[
iam.ManagedPolicy.from_managed_policy_arn(
self,
id="SagemakerFullAccess",
managed_policy_arn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess"
),
],
)
self.sm_sources_bucket = s3.Bucket(
self,
id="SourcesBucket",
bucket_name=f"{self.prefix}-sm-sources",
lifecycle_rules=[],
versioned=False,
removal_policy=cdk.RemovalPolicy.DESTROY,
auto_delete_objects=True,
# Access
access_control=s3.BucketAccessControl.PRIVATE,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
public_read_access=False,
object_ownership=s3.ObjectOwnership.OBJECT_WRITER,
enforce_ssl=True,
# Encryption
encryption=s3.BucketEncryption.S3_MANAGED,
)
# Grant read access to SageMaker execution role
self.sm_sources_bucket.grant_read(self.sm_execution_role)
# Grant read/write access to SageMaker execution role
self.sm_data_bucket.grant_read_write(self.sm_execution_role)
# Fetch VPC information
vpc_name = self.node.try_get_context("vpc_name")
self.vpc = ec2.Vpc.from_lookup(
self, id="ImportedVpc",
vpc_name=vpc_name if vpc_name else f"{self.prefix}-vpc"
)
public_subnet_ids = [public_subnet.subnet_id for public_subnet in self.vpc.public_subnets]
# Create SageMaker Studio domain
self.domain = sm.CfnDomain(
self, "SagemakerDomain",
auth_mode='IAM',
domain_name=f'{self.prefix}-SG-Project',
default_user_settings=sm.CfnDomain.UserSettingsProperty(
execution_role=self.sm_execution_role.role_arn
),
app_network_access_type='PublicInternetOnly',
vpc_id=self.vpc.vpc_id,
subnet_ids=public_subnet_ids,
tags=[cdk.CfnTag(
key="project",
value="example-pipelines"
)],
)
# Create SageMaker Studio default user profile
self.user = sm.CfnUserProfile(
self, 'SageMakerStudioUserProfile',
domain_id=self.domain.attr_domain_id,
user_profile_name='default-user',
user_settings=sm.CfnUserProfile.UserSettingsProperty()
)
vpc = ec2.Vpc(
self,
id="VpcConstruct",
ip_addresses=ec2.IpAddresses.cidr("10.0.0.0/16"),
vpc_name=f"{self.prefix}-vpc",
max_azs=3,
nat_gateways=1,
subnet_configuration=[
ec2.SubnetConfiguration(
cidr_mask=24,
name="Public",
subnet_type=ec2.SubnetType.PUBLIC,
),
ec2.SubnetConfiguration(
cidr_mask=23,
name="Private",
subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS,
),
ec2.SubnetConfiguration(
cidr_mask=24,
name="Isolated",
subnet_type=ec2.SubnetType.PRIVATE_ISOLATED,
),
],
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment