Created
August 25, 2024 10:39
-
-
Save mataralhawiti/99a4c68fd95eaa64106aa314d8971d90 to your computer and use it in GitHub Desktop.
copy files from SMB to Google Cloud Storage using (smb.SMBConnection)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import datetime | |
import logging | |
import sys | |
import tempfile | |
from google.cloud import storage | |
from smb.SMBConnection import SMBConnection | |
import re | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
stream=sys.stdout, | |
) | |
logger = logging.getLogger() | |
# Load configuration from 'config.json' | |
""" | |
{ | |
"general": { | |
"project_id": "ENV_PROJECT", | |
"location": "ENV_PRO_LOCATION", | |
"bucket_name": "ENV_BUCKET", | |
"days_to_include": 2 | |
}, | |
"templates": {"name": "xx"} | |
} | |
""" | |
with open("config.json", "r") as config_file: | |
config = json.load(config_file) | |
# Extract general configuration details | |
general = config["general"] | |
# Define GCS variables | |
days_to_include = general["days_to_include"] # It can be changed in config.json file | |
BUCKET_NAME = general["bucket_name"] | |
CURRENT_DAY = datetime.date.today().strftime("%Y%m%d") | |
YESTERDAY = (datetime.date.today() - datetime.timedelta(days=10)).strftime("%Y-%m-%d") | |
date_to_include = ( | |
datetime.date.today() - datetime.timedelta(days=days_to_include) | |
).strftime("%Y-%m-%d") | |
PREFIX = "smb-data" | |
# Define SMB variables | |
ROOT_DIR = "MY-SMB-ROOT-DIR-NAME" | |
def get_smb(request=None): | |
# Establish an SMB connection using environment variables for credentials | |
connection = SMBConnection( | |
username=os.environ.get("SMB_USER"), | |
password=os.environ.get("SMB_PASS"), | |
my_name="gcp", | |
remote_name=os.environ.get("SMB_REMOTE_NAME"), # NetBIOS name | |
domain="", | |
use_ntlm_v2=True, | |
) | |
try: | |
# Log connection initiation | |
logger.info("Attempting to connect to the SMB server...") | |
# Connect to the SMB server using the specified IP address | |
connection.connect(ip=os.environ.get("SMB_IP")) | |
def files_paths_func(connection, ROOT_DIR, sub_dir="", files_names=None): | |
""" | |
Recursively retrieves file paths under a given directory based on certain conditions. | |
Parameters: | |
- connection: The connection object for accessing file information. | |
- ROOT_DIR: The root directory to start the search. | |
- sub_dir: Subdirectory within ROOT_DIR to explore (default is an empty string for the root). | |
- files_names: List to store the file paths (default is an empty list). | |
Returns: | |
- List of file paths found under the specified directory and its subdirectories. | |
""" | |
# Initialize files_names if it's not provided | |
if files_names is None: | |
files_names = [] | |
# Get a list of files and directories in the specified path | |
files = [ | |
f | |
for f in connection.listPath( | |
service_name=ROOT_DIR, path=sub_dir, pattern="*" | |
) | |
] | |
# Iterate through the files and directories | |
for f in files: | |
# Create the full path by joining the subdirectory and filename | |
full_path = os.path.join(sub_dir, f.filename) | |
# Check if it's a directory and not a special directory (like '.' or '..') | |
if f.isDirectory and f.filename not in (".", ".."): | |
# Recursively call the function for subdirectories | |
files_paths_func(connection, ROOT_DIR, full_path, files_names) | |
# Check if it's a file and modified after a certain date (date_to_include) | |
elif ( | |
not f.isDirectory | |
and datetime.datetime.fromtimestamp(f.last_write_time).strftime( | |
"%Y-%m-%d" | |
) | |
> date_to_include | |
): | |
# Add the file path to the list | |
files_names.append(full_path) | |
# Return the list of file paths | |
return files_names | |
# Extract the paths in a list | |
files_paths = files_paths_func(connection, ROOT_DIR) | |
# File counter | |
files_counter = len(files_paths) | |
for file_path in files_paths: | |
# Log file processing | |
logger.info(f"Processing {file_path}") | |
try: | |
# Use a temporary file to retrieve the SMB file | |
with tempfile.NamedTemporaryFile() as file_obj: | |
# Retrieve the file from the SMB server | |
connection.retrieveFile( | |
service_name=ROOT_DIR, path=file_path, file_obj=file_obj | |
) | |
# Create a Google Cloud Storage client | |
client = storage.Client() | |
# Get the GCS bucket | |
bucket = client.bucket(BUCKET_NAME) | |
# Create a blob (object) in the bucket with a sub-directory Prefix | |
# clean_file_name = clean_file_path(file) | |
blob = bucket.blob(f"{PREFIX}/{CURRENT_DAY}/{file_path}") | |
# Upload the binary file to the GCS blob | |
blob.upload_from_file(file_obj=file_obj, rewind=True) | |
except Exception as e: | |
logger.info(f"Error for smb file: {e}") | |
finally: | |
# Close the SMB connection even if an error occurs | |
connection.close() | |
return files_counter | |
def clean_file_path(f): | |
name, ext = os.path.splitext(f) | |
name = re.sub("(\W+)", "-", name) | |
string = name + ext | |
return string.lower() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment