|
import asyncio |
|
from typing import Any |
|
|
|
from discord.ext import commands |
|
|
|
|
|
|
|
class Test(commands.Cog): |
|
'''test cog''' |
|
|
|
def __init__(self): |
|
|
|
# We need to store the concurrencies. I am storing it in the cog scope attached to self, for easy access. |
|
self.user_concurrency = commands.MaxConcurrency(1, per=commands.BucketType.user, wait=False) |
|
self.channel_concurrency = commands.MaxConcurrency(1, per=commands.BucketType.channel, wait=False) |
|
|
|
@commands.command() |
|
async def test(self, ctx: commands.Context[commands.Bot]): |
|
"""Test command.""" |
|
m = await ctx.send('sleeping ten seconds') |
|
await asyncio.sleep(10) |
|
await m.edit(content='slept') |
|
|
|
@test.before_invoke |
|
async def test_before(self, ctx: commands.Context[commands.Bot]): |
|
"""This triggers before the command runs.""" |
|
|
|
await self.user_concurrency.acquire(ctx.message) |
|
# First, we acquire the first concurrency. |
|
# which, if its already acquired it will |
|
# raise an error because wait is set to true. |
|
|
|
try: |
|
await self.channel_concurrency.acquire(ctx.message) |
|
# Then, we acquire our second concurrency. |
|
|
|
except commands.MaxConcurrencyReached as error: |
|
# If this second concurrency fails, we need to release the |
|
# first one we just acquired since it will not go through, and |
|
# if we don't do this, the first one will be locked forever |
|
# because the after_invoke hook will not trigger. |
|
await self.user_concurrency.release(ctx.message) |
|
# Then, we raise the error so it propagates to the error handlers |
|
# and it doesn't just get eaten. If we don't raise it will instead |
|
# work without raising an error. |
|
raise error |
|
|
|
@test.after_invoke |
|
async def test_after(self, ctx: commands.Context[commands.Bot]): |
|
"""This triggers after the command ran.""" |
|
|
|
# Here in the after, we just release both concurrencies so |
|
# the commands can be ran again by other people. |
|
await self.user_concurrency.release(ctx.message) |
|
await self.channel_concurrency.release(ctx.message) |
It won't be needed by most, but if one has enough concurrencies that managing the try-excepts becomes a pain, this can be refactored to work on a sequence of
MaxConcurrency
instances. Same principles apply, but I figured it was worth mentioning.