Last active
March 22, 2025 16:29
-
-
Save guinslym/c587d46bae393db3acd0644d3d3aff4c to your computer and use it in GitHub Desktop.
twitter post automation.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import tweepy | |
import schedule | |
import time | |
import requests | |
# Step 1: Twitter API Authentication | |
TWITTER_API_KEY = "your_consumer_key" | |
TWITTER_API_SECRET_KEY = "your_consumer_secret" | |
TWITTER_ACCESS_TOKEN = "your_access_token" | |
TWITTER_ACCESS_TOKEN_SECRET = "your_access_token_secret" | |
auth = tweepy.OAuth1UserHandler(TWITTER_API_KEY, TWITTER_API_SECRET_KEY, TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET) | |
api = tweepy.API(auth) | |
# Step 2: File Path to Local CSV | |
FILE_PATH = "tweets.csv" # Replace with the actual path to your downloaded CSV file | |
# Step 3: Function to Post Tweet | |
def post_tweet(): | |
rows = [] | |
updated_rows = [] | |
tweet_posted = False | |
# Read the CSV file | |
with open(FILE_PATH, "r", newline="", encoding="utf-8") as file: | |
reader = csv.DictReader(file) | |
fieldnames = reader.fieldnames | |
rows = list(reader) | |
for row in rows: | |
# Skip rows that are already posted | |
if row.get("Status", "").lower() == "posted": | |
updated_rows.append(row) | |
continue | |
tweet_text = row["Tweet"] # Assuming the column "Tweet" contains the tweet text | |
image_url = row.get("Image URL", "") # Assuming the column "Image URL" contains the image link | |
try: | |
if image_url: | |
# Download and upload the image to Twitter | |
image_path = "temp_image.jpg" | |
response = requests.get(image_url) | |
with open(image_path, "wb") as file: | |
file.write(response.content) | |
media = api.media_upload(image_path) | |
media_id = media.media_id | |
# Post tweet with media | |
api.update_status(status=tweet_text, media_ids=[media_id]) | |
else: | |
# Post tweet without media | |
api.update_status(status=tweet_text) | |
print(f"Tweet posted successfully: {tweet_text}") | |
row["Status"] = "Posted" # Mark the tweet as posted | |
updated_rows.append(row) | |
tweet_posted = True | |
break # Post only one tweet per execution | |
except Exception as e: | |
print(f"Failed to post tweet: {e}") | |
break | |
# Save the updated rows back to the CSV file | |
if tweet_posted: | |
with open(FILE_PATH, "w", newline="", encoding="utf-8") as file: | |
writer = csv.DictWriter(file, fieldnames=fieldnames) | |
writer.writeheader() | |
writer.writerows(updated_rows) | |
# Step 4: Schedule the Tweets | |
schedule.every(2).hours.do(post_tweet) # Schedule the function to run every 2 hours | |
# Keep the script running | |
print("Starting the tweet scheduler...") | |
while True: | |
schedule.run_pending() | |
time.sleep(60) # Check every minute for pending tasks |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gspread | |
import tweepy | |
import schedule | |
import time | |
import requests | |
# Step 1: Twitter API Authentication | |
TWITTER_API_KEY = "your_consumer_key" | |
TWITTER_API_SECRET_KEY = "your_consumer_secret" | |
TWITTER_ACCESS_TOKEN = "your_access_token" | |
TWITTER_ACCESS_TOKEN_SECRET = "your_access_token_secret" | |
auth = tweepy.OAuth1UserHandler(TWITTER_API_KEY, TWITTER_API_SECRET_KEY, TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET) | |
api = tweepy.API(auth) | |
# Step 2: Google Sheets Authentication | |
gc = gspread.service_account(filename="path_to_service_account.json") # Path to your Google Service Account JSON file | |
sheet = gc.open("Your Google Sheet Name").sheet1 # Open the first sheet | |
# Step 3: Function to Post Tweet | |
def post_tweet(): | |
# Find the next row with content to post | |
all_rows = sheet.get_all_values() # Fetch all rows | |
header = all_rows[0] # Assuming the first row contains headers | |
rows = all_rows[1:] # Skip the header row | |
for i, row in enumerate(rows): | |
if row[-1] == "": # Check if the last column is empty (indicating not posted) | |
tweet_text = row[0] # Assuming the first column contains the tweet text | |
image_url = row[1] # Assuming the second column contains the image URL | |
# Download and upload image to Twitter | |
try: | |
if image_url: | |
image_path = "temp_image.jpg" | |
response = requests.get(image_url) | |
with open(image_path, "wb") as file: | |
file.write(response.content) | |
media = api.media_upload(image_path) | |
media_id = media.media_id | |
# Post tweet with media | |
api.update_status(status=tweet_text, media_ids=[media_id]) | |
else: | |
# Post tweet without media | |
api.update_status(status=tweet_text) | |
print(f"Tweet posted successfully: {tweet_text}") | |
# Update the Google Sheet to mark the tweet as posted | |
sheet.update_cell(i + 2, len(header), "Posted") # +2 because sheet rows start at 1, and we skipped the header row | |
break # Post only one tweet per execution | |
except Exception as e: | |
print(f"Failed to post tweet: {e}") | |
break | |
# Step 4: Schedule the Tweets | |
schedule.every(2).hours.do(post_tweet) # Schedule the function to run every 2 hours | |
# Keep the script running | |
print("Starting the tweet scheduler...") | |
while True: | |
schedule.run_pending() | |
time.sleep(60) # Check every minute for pending tasks |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment