Skip to content

Instantly share code, notes, and snippets.

@guinslym
Created December 19, 2024 15:52
Show Gist options
  • Save guinslym/54c46e14f4cd495510e36be05ca87315 to your computer and use it in GitHub Desktop.
Save guinslym/54c46e14f4cd495510e36be05ca87315 to your computer and use it in GitHub Desktop.
import aioxmpp
import logging
import asyncio
logging.basicConfig(level=logging.DEBUG)
JID = aioxmpp.JID.fromstr("[email protected]")
PASSWORD = "secretpassowod"
async def connect_to_libraryh3lp():
"""Connect to the LibraryH3lp XMPP server."""
# Create a new client
client = aioxmpp.PresenceManagedClient(JID, aioxmpp.make_security_layer(PASSWORD))
client.stream.register_callback(
aioxmpp.Stream.State.ESTABLISHED,
lambda: print("Connected to LibraryH3lp successfully!")
)
client.stream.register_callback(
aioxmpp.Stream.State.CLOSED,
lambda: print("Disconnected from LibraryH3lp.")
)
try:
print("Connecting to LibraryH3lp...")
async with client.connected():
print("Connected! Waiting to receive messages...")
await asyncio.sleep(30) # Keep the connection open for 30 seconds
except Exception as e:
print(f"Error connecting to LibraryH3lp: {e}")
if __name__ == "__main__":
asyncio.run(connect_to_libraryh3lp())
import aioxmpp
import logging
import asyncio
JID = aioxmpp.JID.fromstr("[email protected]")
PASSWORD = "secretpassword"
async def connect_and_listen_for_messages():
client = aioxmpp.PresenceManagedClient(JID, aioxmpp.make_security_layer(PASSWORD))
@client.message_received.connect
def on_message_received(msg):
"""Handle incoming messages."""
print(f"Received message from {msg.from_}: {msg.body[None]}")
try:
print("Connecting to LibraryH3lp...")
async with client.connected():
print("Connected! Listening for incoming messages...")
await asyncio.sleep(60) # Keep the connection open for 60 seconds
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(connect_and_listen_for_messages())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment