Created
December 3, 2023 04:05
-
-
Save hiroshil/b17201818fbe46a805d47c07b5ac0133 to your computer and use it in GitHub Desktop.
Upload file folders to Sharepoint Online/Onedrive Business using Office365-REST-Python-Client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import math | |
from office365.runtime.auth.authentication_context import AuthenticationContext | |
from office365.runtime.client_request_exception import ClientRequestException | |
from office365.sharepoint.client_context import ClientContext | |
baseurl = "https://test-my.sharepoint.com" # sp url | |
basesite = "admin_test_onmicrosoft_com" # sp site name | |
siteurl = baseurl + "/personal/" + basesite # sp site type: personal, teams, sites | |
username = "email" | |
password = "password" | |
ctx_auth = AuthenticationContext(siteurl) # should also be the siteurl | |
ctx_auth.acquire_token_for_user(username, password) | |
ctx = ClientContext(siteurl, ctx_auth) # make sure you auth to the siteurl. | |
size_chunk = 200000000 # kb = 200MB | |
curr_file_size = 0 | |
upload_list = [] # list folders to upload | |
def convert_size(size_bytes): | |
if size_bytes == 0: | |
return "0B" | |
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") | |
i = int(math.floor(math.log(size_bytes, 1024))) | |
p = math.pow(1024, i) | |
s = round(size_bytes / p, 2) | |
return "%s %s" % (s, size_name[i]) | |
def extract_name(file_name): | |
result = [] | |
basepath = file_name.replace(os.getcwd(), "") | |
basename = os.path.basename(file_name) | |
result.append(basename) | |
result.append(basepath.replace(basename, "")) | |
return result | |
def print_upload_progress(offset): | |
print("Uploaded '{0}' from '{1}'...[{2}%]".format(convert_size(offset),convert_size(curr_file_size),round(offset / curr_file_size * 100, 2),)) | |
def try_get_file(web, url): | |
# for overwrite check | |
try: | |
return web.get_file_by_server_relative_url(url).get().execute_query() | |
except ClientRequestException as e: | |
if e.response.status_code == 404: | |
return None | |
else: | |
raise ValueError(e.response.text) | |
def upload_file(local_path): | |
dir_name = extract_name(local_path)[1] | |
global curr_file_size | |
curr_file_size = os.path.getsize(local_path) | |
target_url = siteurl.replace(baseurl, '') + "/Documents/" | |
if os.name == "nt": | |
dir_name = dir_name.replace("\\", "/") | |
cur_dir = target_url | |
for dir in dir_name.split("/"): | |
tmp_target = ctx.web.get_folder_by_server_relative_url(cur_dir) | |
ctx.load(tmp_target) | |
ctx.execute_query() | |
new_folder = tmp_target.folders.add(dir) | |
ctx.execute_query() | |
cur_dir += "/" + dir | |
target_url = target_url[:-1] + dir_name | |
target_folder = ctx.web.get_folder_by_server_relative_url(target_url) | |
if True:#not try_get_file(ctx.web,target_url): | |
with open(local_path, "rb") as f: | |
print("Uploading {0} to {1}".format(local_path,target_url)) | |
uploaded_file = target_folder.files.create_upload_session(f, size_chunk, print_upload_progress).execute_query_retry(timeout_secs=999999) | |
print("File {0} has been uploaded successfully".format(uploaded_file.serverRelativeUrl)) | |
os.remove(local_path) | |
def upload2sp(uname): | |
if os.path.isdir(uname): | |
for path, subdirs, files in os.walk(uname): | |
for name in files: | |
curr = os.path.abspath(os.path.join(path, name)) | |
upload_file(curr) | |
else: | |
upload_file(uname) | |
for upload_name in upload_list: | |
if os.path.exists(upload_name): | |
upload2sp(upload_name) | |
else: | |
print("{0} is not existing".format(upload_name)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment