Skip to content

Instantly share code, notes, and snippets.

@reallytiredofclowns
Last active August 19, 2024 00:18
Show Gist options
  • Save reallytiredofclowns/b51f63d042a4b5416ceee282ee524295 to your computer and use it in GitHub Desktop.
Save reallytiredofclowns/b51f63d042a4b5416ceee282ee524295 to your computer and use it in GitHub Desktop.
Discuit activity summary script
# to do: error checking/resumption code (can use pagination cursor of post to determine when script paused?)
# clean up repetition
# timing issue: if fetching by latest, someone can make a comment
# that puts a post out of the date limits before the looping
# has a chance to fetch the post
# do a second sweep after hitting the date limit?
# would have to store the script's start time and figure out
# when it halts due to hitting the lower date limit and
# reprocess comments according to that...
import requests, time, pandas, datetime
# URL of the last report, to link back to it in the current report
lastReportURL = "https://discuit.net/DiscuitMeta/post/GBoECayW"
# set fromDate to "" to get all
fromDate = "20240811"
toDate = "20240818"
# summary tables show top X items
topX = 10
# no point calculating stats for bots
ignoredUsers = ["autotldr", "FlagWaverBot", "Betelgeuse"]
# userId 000000000000000000000000 is an admin account for collecting
# deleted accounts?
#ignoredIds = ["000000000000000000000000"]
# initial feed nextPage parameter--to be used in eventual resumption code
nextPage = ""
baseURL = "https://discuit.net"
#baseURL = "http://localhost:8080"
##########################################################
def dateFormat(date):
return date[:10].replace("-", "")
def serverDateToDT(s):
serverDateFormat = '%Y-%m-%dT%H:%M:%S%z'
return datetime.datetime.strptime(s, serverDateFormat)
def daysAgo(dt):
currDateTime = datetime.datetime.now(tz=datetime.timezone.utc)
return max(0, (currDateTime - dt).days)
# title field may have special characters that need to be escaped
def cleanTitle(title):
return title.translate(str.maketrans({
"|": r"\|", "[": r"\[", "]": r"\]", "(": r"\(", ")": r"\)", "_": r"\_", "*": "\*"}))
def fetchFeed(feedNext, disc = None, sort = "activity"):
args = {"sort": sort, "next": feedNext}
if disc:
args["communityId"] = disc
response = requests.get(rf"{baseURL}/api/posts", args)
json = response.json()
return json["posts"], json["next"]
def getFullPost(post):
return requests.get(
f"{baseURL}/api/posts/{post['publicId']}").json()
def commentIsValid(comment):
if comment["deletedAt"]:
return False
if comment["editedAt"]:
commentDate = dateFormat(comment["editedAt"])
else:
commentDate = dateFormat(comment["createdAt"])
if (fromDate != "" and commentDate < fromDate) or\
commentDate > toDate:
return False
username = comment["username"]
if username in ignoredUsers:
return False
return True
def processComments(post, activeUsers):
fullPost = getFullPost(post)
# posts from home feed don't seem to contain comments
comments = fullPost["comments"]
commentsNext = fullPost["commentsNext"]
commentCount = 0
while comments:
for comment in comments:
if not commentIsValid(comment):
continue
commentCount += 1
username = comment["username"]
if not (username in activeUsers.index):
activeUsers.loc[username] = [0] * len(activeUsers.columns)
activeUsers.loc[username, "Comments"] += 1
if commentsNext:
comments = requests.get(
f"{baseURL}/api/posts/{fullPost['publicId']}/comments",
{"next": commentsNext}).json()
comments, commentsNext = comments["comments"], comments["next"]
else:
break
return commentCount
def postIsValid(post):
reachedTimeLimit = False
# hardValid is True/False depending on non-date-associated reasons
# for validity--ignored users, past oldest date to retrieve,
# deleted post, banned author
hardValid = True
# softValid is True/False based solely on the post date--if
# the post date falls between the from-to configuration
# needed because there could be comments in the post in the
# date range, but the post itself is not in the date range
softValid = True
username = post["username"]
lastActivityAt = dateFormat(post["lastActivityAt"])
createdAt = dateFormat(post["createdAt"])
if not (
(fromDate <= lastActivityAt <= toDate) or\
(fromDate <= createdAt <= toDate)):
softValid = False
if fromDate != "" and lastActivityAt < fromDate:
reachedTimeLimit = True
hardValid = False
elif post["deleted"]:
hardValid = False
elif username in ignoredUsers:
hardValid = False
return hardValid, reachedTimeLimit, softValid
def processPosts(posts, activeUsers, activeDiscs, topPosts):
reachedTimeLimit = False
lastSuccessfulPostDate = ""
for post in posts:
# filter out posts that are out of scope
username = post["username"]
lastActivityAt = dateFormat(post["lastActivityAt"])
hardValid, reachedTimeLimit, softValid = postIsValid(post)
if not hardValid:
if reachedTimeLimit:
break
# skip the post if it is hard-invalid
continue
# need to separate invalid due to deletion vs invalid due to time
# must be a conditionally valid post after above filtering
title = cleanTitle(post["title"].replace("\n", " "))
discName = post["communityName"]
postType = post["type"] # "text", "image", "link"
if not (username in activeUsers.index):
activeUsers.loc[username] = [0] * len(activeUsers.columns)
postType = postType.title() + "s"
activeUsers.loc[username, postType] += 1
numComments = processComments(post, activeUsers)
# if there are no valid comments in the timeframe of interest
# and the post dates are not in the timeframe, then
# conclusively skip the post; otherwise count it
if not numComments and not softValid:
continue
if not (discName in activeDiscs.index):
activeDiscs.loc[discName] = [0] * len(activeDiscs.columns)
activeDiscs.loc[discName, postType] += 1
activeDiscs.loc[discName, "Comments"] += numComments
url = f"{baseURL}/{discName}/post/{post['publicId']}"
if not (url in topPosts.index):
# can init everything to zero: number cells can be incremented
# and str cells can overwrite original zero
topPosts.loc[url] = [0] * len(topPosts.columns)
topPosts.loc[url, "Type"] = postType
topPosts.loc[url, "Disc"] = discName
topPosts.loc[url, "Title"] = title
topPosts.loc[url, "User"] = username
topPosts.loc[url, "Comments"] = numComments
lastSuccessfulPostDate = lastActivityAt
return lastSuccessfulPostDate, reachedTimeLimit
def generateTables(nextPage):
lastPostDate = ""
topPosts = pandas.DataFrame({
"Rank": [], "Type": [], "Disc": [], "Title": [], "User": [], "Comments": []})
activeUsers = pandas.DataFrame({
"Rank": [], "Texts": [], "Images": [], "Links": [], "TotalPosts": [], "Comments": [], "TotalEngagement": []},
pandas.Index([], name = "User"))
activeDiscs = pandas.DataFrame({
"Rank": [], "Texts": [], "Images": [], "Links": [], "TotalPosts": [], "Comments": [], "TotalEngagement": []},
pandas.Index([], name = "Disc"))
while True:
print(f"Pagination parameter is: {nextPage}; last processed post date was: {lastPostDate}")
posts, nextPage = fetchFeed(nextPage)
lastPostDate, reachedTimeLimit = processPosts(
posts, activeUsers, activeDiscs, topPosts)
if nextPage is None or reachedTimeLimit:
break
time.sleep(2)
return activeUsers, activeDiscs, topPosts
def topXReport(activeUsers, activeDiscs, topPosts):
sumPostComments = topPosts["Comments"].sum()
numDiscs = len(topPosts['Disc'].unique())
print(f"\n\nDiscuit week in review: {fromDate}-{toDate}\n")
print(f"\n[Last week's report is here]({lastReportURL}).")
print("\nDiscuit API is [documented here](https://docs.discuit.net/getting-started). "
"Source code of script generating the tables is "
"[available here](https://gist.github.com/reallytiredofclowns/b51f63d042a4b5416ceee282ee524295).")
registeredAccounts = requests.get(
f"{baseURL}/api/_initial").json()["noUsers"]
print(f"\nOver the last week, {len(activeUsers)} users discussed {len(topPosts)} posts in "
f"{sumPostComments} comments over {numDiscs} total discs. "
f"At the time of this report, there were {registeredAccounts} accounts.\n")
print("Felix30 has been [charting some of these numbers here](https://docs.google.com/spreadsheets/d/1H7zV_7YIZar9dwDHbutr0Dm9N6H-1mEXe0irIwSHsx0/edit#gid=1256137398).\n")
postTypes = topPosts["Type"].unique()
postTypes.sort()
for postType in postTypes:
subset = topPosts.query("Type == @postType").\
drop(columns = "Type").copy()
if len(subset):
subset["User"] = subset["User"].str.replace("_", "\\_")
subset["Rank"] = subset["Comments"].rank(method = "min", ascending = False)
subset = subset.query("Rank <= @topX")
subset = subset.sort_values("Rank")
subset = subset.reset_index()
subset["Title"] = "[" + subset["Title"] + "](" + subset["index"] + ")"
subset = subset.drop(columns = "index")
print(f"# Top {topX} most engaging {postType}:")
print(subset.to_markdown(index = False))
print("\n\n")
activeDiscs["TotalPosts"] = activeDiscs["Texts"] + activeDiscs["Images"] + activeDiscs["Links"]
activeDiscs["TotalEngagement"] = activeDiscs["TotalPosts"] + activeDiscs["Comments"]
activeDiscs["Rank"] = activeDiscs["TotalEngagement"].rank(method = "min", ascending = False)
# reset the index after filling out the calculations, so
# the reassignment doesn't break the link with the original
# input dataframe
activeDiscs = activeDiscs.reset_index()
subset = activeDiscs.query("Rank <= @topX")
subset = subset.sort_values("Rank")
subset["Disc"] = "[" + subset["Disc"] + f"]({baseURL}/" + subset["Disc"] + ")"
colOrder = ["Rank"] + [_ for _ in subset.columns if _ != "Rank"]
subset = subset[colOrder]
print(f"# Top {topX} most engaging Discs:")
print(subset.to_markdown(index = False))
print("\n")
# remove Ghost user from the active users table
if "ghost" in activeUsers.index:
activeUsers.drop("ghost", inplace = True)
activeUsers["TotalPosts"] = activeUsers["Texts"] + activeUsers["Images"] + activeUsers["Links"]
activeUsers["TotalEngagement"] = activeUsers["TotalPosts"] + activeUsers["Comments"]
activeUsers["Rank"] = activeUsers["TotalEngagement"].rank(method = "min", ascending = False)
# reset the index after filling out the calculations, so
# the reassignment doesn't break the link with the original
# input dataframe
activeUsers = activeUsers.reset_index()
activeUsers["User"] = activeUsers["User"].str.replace("_", "\\_")
subset = activeUsers.query("Rank <= @topX")
subset = subset.sort_values("Rank")
subset["User"] = "[" + subset["User"] + f"]({baseURL}/@" + subset["User"] + ")"
colOrder = ["Rank"] + [_ for _ in subset.columns if _ != "Rank"]
subset = subset[colOrder]
print(f"# Top {topX} most engaged Discuiteers:")
print(subset.to_markdown(index = False))
def discLatestActivityReport():
discActivity = pandas.DataFrame(
{"DaysSinceLastActivity": []}, pandas.Index([], name = "Disc"))
communityList = requests.get(f"{baseURL}/api/communities").json()
for comm in communityList:
# reset pagination for each disc
nextPage = ""
daysSinceActivity = None
commName = comm["name"]
commId = comm["id"]
while True:
print(commName)
posts, nextPage = fetchFeed(nextPage, disc = commId)
if posts:
# from/to date limit not used here, so not using the postIsValid function
for post in posts:
if post["deletedAt"] or post["author"]["isBanned"] or\
post["username"] in ignoredUsers:
continue
daysSinceActivity = daysAgo(serverDateToDT(post["lastActivityAt"]))
break
if nextPage is None or daysSinceActivity is not None:
discActivity.loc[commName] = [daysSinceActivity]
break
else: # empty disc
discActivity.loc[commName] = [daysSinceActivity]
break
time.sleep(3)
discActivity.loc[discActivity["DaysSinceLastActivity"] <= 1, "ChartCategory"] = "01) 1 day"
discActivity.loc[discActivity.query("1 < DaysSinceLastActivity <= 2").index, "ChartCategory"] = "02) 2 days"
discActivity.loc[discActivity.query("2 < DaysSinceLastActivity <= 3").index, "ChartCategory"] = "03) 3 days"
discActivity.loc[discActivity.query("3 < DaysSinceLastActivity <= 4").index, "ChartCategory"] = "04) 4 days"
discActivity.loc[discActivity.query("4 < DaysSinceLastActivity <= 5").index, "ChartCategory"] = "05) 5 days"
discActivity.loc[discActivity.query("5 < DaysSinceLastActivity <= 6").index, "ChartCategory"] = "06) 6 days"
discActivity.loc[discActivity.query("6 < DaysSinceLastActivity <= 7").index, "ChartCategory"] = "07) 1 week"
discActivity.loc[discActivity.query("7 < DaysSinceLastActivity <= 14").index, "ChartCategory"] = "08) 2 weeks"
discActivity.loc[discActivity.query("14 < DaysSinceLastActivity <= 21").index, "ChartCategory"] = "09) 3 weeks"
discActivity.loc[discActivity.query("21 < DaysSinceLastActivity <= 28").index, "ChartCategory"] = "10) 4 weeks"
discActivity.loc[28 < discActivity["DaysSinceLastActivity"], "ChartCategory"] = "11) > 4 weeks"
discActivity.loc[discActivity["DaysSinceLastActivity"].isna(), "ChartCategory"] = "12) No activity"
return discActivity
def modActivityReport():
discActivity = pandas.DataFrame(
{"CreatedDaysAgo": [], "ActivityDaysAgo": [], "ModActivityDaysAgo": []},
pandas.Index([], name = "Disc"))
discList = requests.get("{baseURL}/api/communities").json()
for disc in discList:
time.sleep(3)
# reset variables for each disc
discName = disc["name"]
print("Looping for", discName)
discId = disc["id"]
# communities API doesn't appear to return full data, so do a second request
discData = requests.get(f"{baseURL}/api/communities/{disc['name']}", {"byName": "true"}).json()
discMods = discData["mods"]
discLastActivity = None
modLastActivity = None
discCreated = daysAgo(serverDateToDT(discData["createdAt"]))
posts, _ = fetchFeed("", disc = discId)
if posts:
post = posts[0]
discLastActivity = daysAgo(serverDateToDT(post["lastActivityAt"]))
modActivityList = []
for mod in discMods:
response = requests.get(f"{baseURL}/api/users/{mod['username']}/feed", {"limit": 1})
# possibility of mod being banned, which would return 403 error... or 401?
if response.status_code in (401, 403):
continue
activityItem = response.json()["items"]
if not activityItem:
continue
activityItem = activityItem[0]
# seems comments have a postId and posts do not?
if "postId" in activityItem:
tempList = [activityItem["item"]["createdAt"], #activityItem["item"]["lastActivityAt"],
activityItem["item"]["editedAt"]]
tempList = [_ for _ in tempList if _ != None]
currModActivity = max(tempList)
else: #comment
tempList = [activityItem["item"]["createdAt"], activityItem["item"]["editedAt"]]
tempList = [_ for _ in tempList if _ != None]
currModActivity = max(tempList)
modActivityList.append(daysAgo(serverDateToDT(currModActivity)))
if modActivityList:
modLastActivity = min(modActivityList)
discActivity.loc[discName] = [discCreated, discLastActivity, modLastActivity]
return discActivity
######################################################
activeUsers, activeDiscs, topPosts = generateTables(nextPage)
topXReport(activeUsers, activeDiscs, topPosts)
#discActivity = discLatestActivityReport()
#discModReport = modActivityReport()
@reallytiredofclowns
Copy link
Author

I'm happy to look over ideas but not really interested in turning it into a full-fledged repo for collaboration at this time (not really interested in the management aspect of it). If someone wants to take it over, I'm more than happy to step aside--it would give me some time to go and look at other things.

Thanks for the suggestion for re replacement. I think my original impetus for the escaping was for (1) newlines in titles screwing up the tables when the text was converted to markdown, and (2) the same for user-supplied tagging (e.g., marking geographic region [USA] in news articles) in the titles. The existing code seems sufficient for now, but I can keep in mind your suggestion if it needs to be more aggressive in the future.

@MarkMoretto
Copy link

@reallytiredofclowns Gotcha. I was thinking about the escaping after posting and realized you use the markdown feature in pandas (DataFrame.to_markdown(...)). I haven't used the markdown package much, but I'm surprised it doesn't handle the things you mentioned.

Another thing that I wanted to explore with this script was speeding it up a bit (like you mentioned). I can leave another comment with an idea if I come up with something, but will also respect your stance of not messing with it too much.

Thanks for the response! I appreciate the insight.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment