This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_get_bucket_contents_and_get_table_details_from_clients__when_real_bucket_exists_and_table_exists__should_return_correctly(): | |
# Arrange | |
s3_util = S3Util(boto3.client('s3')) | |
dynamo_util = DynamoUtil(boto3.client('dynamodb')) | |
# Act | |
r = main.get_bucket_contents_and_get_table_details_from_clients(s3_util, dynamo_util) | |
# Assert - S3 | |
valid_objects = ['object1.txt', 'object2.txt', 'object3.txt'] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_get_bucket_contents_and_get_table_details_from_clients__when_bucket_exists_and_able_exists__should_return_correctly(): | |
# Arrange | |
# Mock the s3_client with a return value on chained method calls | |
mock_s3_util = MagicMock() | |
mock_s3_util.list_objects.return_value = ["Some Object", "Another Object"] | |
mock_dynamo_util = MagicMock() | |
mock_dynamo_util.describe_table.return_value = {"TableName": "string"} | |
# Act | |
r = main.get_bucket_contents_and_get_table_details_from_clients(mock_s3_util, mock_dynamo_util) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def get_bucket_contents_and_get_table_details(): | |
""" | |
Method 1: Create the clients & class instances inside our method | |
""" | |
# Create Clients | |
s3_client = boto3.client("s3") | |
s3_util = S3Util(s3_client) | |
dynamo_client = boto3.client("dynamo") | |
dynamo_util = DynamoUtil(dynamo_client) | |
# Get bucket contents & table details |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
from python_poetry_template.s3_util_class import S3Util | |
def test_list_objects__when_24_objects_in_pubgoer_bucket__should_return_24_objects(): | |
# Arrange | |
s3_client = boto3.client("s3") | |
# Act |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
class S3Util: | |
def __init__(self): | |
self.s3_client = boto3.client("s3") | |
def list_objects(self, bucket_name, jmes_exp=None): | |
paginator = self.s3_client.get_paginator("list_objects") | |
response = paginator.paginate(Bucket=bucket_name, PaginationConfig={}).build_full_result() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from python_poetry_template.s3_util_class import S3Util | |
from mock import MagicMock | |
def test_list_objects__when_no_objects_in_bucket__should_return_empty_list(): | |
# Arrange | |
# Mock the s3_client with a return value on chained method calls | |
mock_s3_client = MagicMock() | |
mock_s3_client.get_paginator().paginate().build_full_result.return_value = {"Contents": []} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class S3Util: | |
def __init__(self, s3_client): | |
self.s3_client = s3_client | |
def list_objects(self, bucket_name, jmes_exp=None): | |
paginator = self.s3_client.get_paginator("list_objects") | |
response = paginator.paginate(Bucket=bucket_name, PaginationConfig={}).build_full_result() | |
if jmes_exp: | |
response = response.search(jmes_exp) | |
return response["Contents"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: kyverno.io/v1 | |
kind: ClusterPolicy | |
metadata: | |
name: vpa-recommender | |
annotations: | |
policies.kyverno.io/title: VPA Recommender | |
policies.kyverno.io/category: Cluster-Policy | |
policies.kyverno.io/description: >- | |
Creates a VerticalPodAutoscaler resource for Deployments that | |
meet the criteria. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: kyverno.io/v1 | |
kind: ClusterPolicy | |
metadata: | |
name: vpa-recommender | |
annotations: | |
policies.kyverno.io/title: VPA Recommender | |
policies.kyverno.io/category: Cluster-Policy | |
policies.kyverno.io/description: >- | |
Creates a VerticalPodAutoscaler resource with MODE = Off for a Deployment | |
in the namespace of the original resource. As the MODE is Off, this |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: kyverno.io/v1 | |
kind: ClusterPolicy | |
metadata: | |
annotations: | |
policies.kyverno.io/title: "Add Deployment Readiness Probe Timeout" | |
policies.kyverno.io/subject: Deployment | |
policies.kyverno.io/description: "Adding policy to update readiness probe default timeout to 5 seconds. https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/" | |
name: add-pod-readiness-liveness-probe-timeout | |
spec: | |
background: false |
NewerOlder