Skip to content

Instantly share code, notes, and snippets.

@threevi
Last active May 24, 2021 22:22
Show Gist options
  • Save threevi/638f769e9e07f0aa79163b17f1b3fd55 to your computer and use it in GitHub Desktop.
Save threevi/638f769e9e07f0aa79163b17f1b3fd55 to your computer and use it in GitHub Desktop.
Useful Python 3 Code Snippets

Python3 Snippets

Make HTTP(S) Request

import http.client
import json

connection = http.client.HTTPSConnection('api.github.com')

headers = {'Content-type': 'application/json'}

foo = {'text': 'Hello world github/linguist#1 **cool**, and #1!'}
json_foo = json.dumps(foo)

connection.request('POST', '/markdown', json_foo, headers)

response = connection.getresponse()
print(response.read().decode())

Run code at script exit

Method 1

  • Works great for normal termination of the script, but it won't get called in all cases (e.g. fatal internal errors).
import atexit

def exit_handler():
    print 'My application is ending!'

atexit.register(exit_handler)

Method 2

def main():
    try:
        execute_app()
    finally:
        handle_cleanup()

if __name__=='__main__':
    main()

Make insecure HTTPS request

import http.client
import ssl

http.client.HTTPSConnection("www.google.com", context=ssl._create_unverified_context())

Basic ArgParse example

def getopts():
    parser = argparse.ArgumentParser()

    parser.add_argument("-u", "--username",
            default=None,
            help="Docker UCP login username")

    parser.add_argument("-p", "--password",
            default=None,
            help="Docker UCP login password")

    parser.add_argument("-i", "--insecure",
            action="store_true",
            help="Allow insecure HTTPS connection to Docker UCP")

    parser.add_argument("-v", "--verbose",
            action="store_true",
            help="Enable debug level logging")

    parser.add_argument("ucp_url", default=None,
            help="Docker UCP URL")
    
    return parser.parse_args()

Basic Logging

logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('>>> %(asctime)s  [%(levelname)s]  %(message)s'))
logger.addHandler(handler)

if opts.verbose:
    logger.setLevel(logging.DEBUG)
else:
    logger.setLevel(logging.INFO)

Get Basic Date and Time

import datetime
now = datetime.datetime.now()

now_str = str(now)

Editing Global Variables

global_var = 1

def example1():
    # global keyword is not needed, local_var will be set to 1.
    local_var = global_var

def example2():
    # global keyword is needed, if you want to set global_var,
    # otherwise you will create a local variable.
    global_var = 2

def example3():
    # Without using the global keyword, this is an error.
    # It's an attempt to reference a local variable that has not been declared.
    global_var += 1

Creating Directories (Even if they already exist)

os.makedirs("path/to/directory", exist_ok=True)

Write Text to a File

with open("Output.txt", "w") as text_file:
    text_file.write("Purchase Amount: %s" % TotalAmount)

Download File from the Internet

import urllib.request
import shutil

# Download the file from `url` and save it locally under `file_name`:
with urllib.request.urlopen(url) as response, open(file_name, 'wb') as out_file:
    shutil.copyfileobj(response, out_file)

Download File from the Internet with Suggested Filename

with urllib.request.urlopen(url) as response:
    header = response.headers["Content-Disposition"]
    filename = header.split('=')[-1].replace('"','')

    with open(filename, 'wb') as out_file:
      shutil.copyfileobj(response, out_file)

Natural Sort of Strings in List (Human Sort)

import re

def natural_sort(l):
    convert = lambda text: int(text) if text.isdigit() else text.lower()
    alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key) ]
    return sorted(l, key = alphanum_key)

POST with urllib

import json
import sys

from urllib import request

data = {"user": "greg", "id": "a34001"}
data_json = json.dumps(data)

req = request.Request("https://address.tld/api/user",
    data=bytes(data_json, encoding="utf-8"),
    headers={"Content-Type": "application/json"}, method="POST")

token = None
with request.urlopen(req) as resp:
    if resp.status == 200:
        token_json = json.loads(resp.read())
        token = token_json["token"]
    else:
        print("ERROR")
        sys.exit(1)

POST with urllib and Skip SSL Validation

import json
import ssl
import sys

from urllib import request

ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE

data = {"user": "greg", "id": "a34001"}
data_json = json.dumps(data)

req = request.Request("https://address.tld/api/user",
    data=bytes(data_json, encoding="utf-8"),
    headers={"Content-Type": "application/json"}, method="POST")

token = None
with request.urlopen(req, context=ctx) as resp:
    if resp.status == 200:
        token_json = json.loads(resp.read())
        token = token_json["token"]
    else:
        print("ERROR")
        sys.exit(1)

Download a File with urllib

import shutil

from urllib import request

auth_token = "614f1694-ac44-447c-8691-c52c2f8d5f21"
dl_zip_req = request.Request("https://address.tld/api/clientbundle",
    headers={"Authorization": "Bearer {}".format(auth_token)}, method="POST")

with request.urlopen(dl_zip_req) as resp, open("client-bundle.zip", "wb") as f:
    shutil.copyfileobj(resp, f)

Unzip Zip File

import os
import sys
import zipfile

if not os.path.exists("client-bundle.zip"):
    print("ERROR: Zip file is missing")
    sys.exit(1)

if zipfile.is_zipfile("client-bundle.zip"):
    with zipfile.ZipFile("client-bundle.zip", 'r') as f:
        f.extractall()
else:
    print("ERROR: Not a valid zip file")
    sys.exit(1)

Get Password Semi-Securely

import getpass

password = getpass.getpass(prompt="Enter Password: ")

Multiprocessing with Process Communication

import multiprocessing                                                          
import time                                                                     
import uuid                                                                     
                                                                                
from collections import namedtuple                                              
                                                                                
Message = namedtuple("Message", ["type", "msg"])                                
                                                                                
class WaitGroup:                                                                
    def __init__(self):                                                         
        self.procs = {}                                                         
                                                                                
    def register(self, proc_id, process):                                       
        self.procs[proc_id] = process                                           
                                                                                
    def deregister(self, proc_id):                                              
        self.procs.pop(proc_id)                                                 
                                                                                
    def active(self):                                                           
        if len(self.procs) != 0:                                                
            return True                                                         
        else:                                                                   
            return False                                                        
                                                                                
def waiter(wait, queue):                                                        
    time.sleep(wait)                                                            
    queue.put(Message(type="PRINT", msg=f"I'm awake after {wait} seconds!"))    
    queue.put(Message(type="KILL", msg=multiprocessing.current_process().name)) 
                                                                                
def runner(queue):                                                              
    wg = WaitGroup()                                                            
                                                                                
    for i in range(6):                                                          
        proc_id = uuid.uuid4()                                                  
                                                                                
        process = multiprocessing.Process(name=proc_id, target=waiter, args=(i, queue))
        process.daemon = True                                                   
                                                                                
        wg.register(proc_id, process)                                           
                                                                                
        process.start()                                                         
                                                                                
    print("This is code that should run first")                                 
    print(multiprocessing.active_children())                                    
                                                                                
    while wg.active():                                                          
        msg = queue.get()                                                       
        if msg.type == "KILL":                                                  
            wg.deregister(msg.msg)                                              
            print(f"Killing proc {msg.msg}")                                    
        else:                                                                   
            print(msg.msg)                                                      
                                                                                
    print("Done with processes")                                                
                                                                                
    queue.close()                                                               
    queue.join_thread()                                                         
                                                                                
queue = multiprocessing.Queue()                                                 
runner(queue)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment