Last active
August 13, 2023 08:24
-
-
Save KhaledBinAmir/091f6d431d935149451a8f6471e3e50e to your computer and use it in GitHub Desktop.
Telegram Webhook Handler for Employee Self Service using Server Script of ERPNext Frappe Framework
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try: | |
request_source = frappe.form_dict.request_source | |
message = frappe.form_dict.message | |
callback_query = frappe.form_dict.callback_query | |
if request_source == "Employee List": | |
docs = json.loads(frappe.form_dict.employees) | |
for idx, d in enumerate(docs) : | |
employee = frappe.db.get_value('Employee', d, ['cell_number', 'telegram_chat_id', 'employee_name', 'telegram_secret'], as_dict=1) | |
if not employee.telegram_chat_id and employee.cell_number : | |
telegram_secret = employee.telegram_secret | |
if not telegram_secret : | |
telegram_secret = 'MKTS_' +frappe.utils.generate_hash(length=20) | |
frappe.db.set_value('Employee', d, 'telegram_secret', telegram_secret) | |
message = 'Hi '+employee.employee_name+',\nplease link your Telegram to your Employee Profile via this url (make sure Telegram app is set up first):\n\nhttps://t.me/MK_Electronics_Bot?start='+telegram_secret | |
frappe.call('frappe.core.doctype.sms_settings.sms_settings.send_sms', receiver_list=[employee.cell_number], msg=message,success_msg=False) | |
frappe.response['message'] = "Sent to non telegram enlisted employees" | |
elif message or callback_query: | |
chat = message['chat'] if message else callback_query['message']['chat'] | |
if chat: | |
chat_type = chat['type'] | |
chat_id = chat['id'] | |
if chat_type == 'private' : | |
chat_text = message['text'] if message else None | |
callback_data = callback_query['data'] if callback_query else None | |
message_id = callback_query['message']['message_id'] if callback_query else None | |
if chat_text == "/employee" or callback_data == "employee": | |
chat_employee = frappe.db.get_value('Employee', {'telegram_chat_id': chat_id}, ['name', 'employee_name'], as_dict=True) | |
if chat_employee : | |
reply_markup = { | |
"inline_keyboard": [ | |
[{"text": "Attendance Report", "callback_data": "emp::attendance_report"}], | |
[{"text": "Leave Balance", "callback_data": "emp::leave_balance"}], | |
[{"text": "Salary Slip", "callback_data": "emp::salary"}], | |
[{"text": "Profile", "callback_data": "emp::profile"}] | |
]} | |
reply_message = 'Please select employee services' | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, reply_markup=reply_markup, message_id = message_id if callback_data else None) | |
else : | |
reply_message = 'This telegram account is not linked with any employee profile.\n\nFor assistance, please contact HR.' | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message) | |
if message : | |
if 'MKTS_' in chat_text: | |
#frappe.set_user("Administrator") | |
secret_key = [chat_text[i:i+25] for i in range(len(chat_text)) if chat_text[i:i+5] == 'MKTS_' and chat_text[i+5:i+25].isalnum()][0] if [chat_text[i:i+25] for i in range(len(chat_text)) if chat_text[i:i+5] == 'MKTS_' and chat_text[i+5:i+25].isalnum()] else None | |
if secret_key : | |
matched_employee = frappe.db.get_value('Employee', {'telegram_secret': secret_key}, ['name', 'employee_name', 'telegram_chat_id'], as_dict=True) | |
if matched_employee : | |
if matched_employee.telegram_chat_id : | |
which_account = 'another Telegram account' | |
if matched_employee.telegram_chat_id == str(chat_id) : | |
which_account = 'this Telegram account' | |
reply_message = 'Dear ' + matched_employee.employee_name + ',\n' + which_account + ' is already assigned for your employee notifications.\n\nFor assistance, please contact HR.' | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message) | |
else : | |
chat_id_exist = frappe.db.get_value('Employee', {"telegram_chat_id": chat_id}, ['name', 'employee_name', 'telegram_chat_id'], as_dict=True) | |
if chat_id_exist : | |
reply_message = 'This telegram account is already linked with employee profile of '+chat_id_exist.name+' : '+chat_id_exist.employee_name+' for notifications.\n\nFor assistance, please contact HR.' | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message) | |
else: | |
frappe.db.set_value('Employee', matched_employee.name, {'telegram_chat_id': chat_id, 'telegram_secret': None}) | |
reply_message = "Dear " + matched_employee.employee_name + ",\nYour Telegram account has been successfully linked to your Employee profile. You'll receive notifications here." | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message) | |
else: | |
reply_message = 'Hello ' + message['from']['first_name'] + ',\nYour enrollment url appears to be invalid.\n\nPlease reach out to HR for assistance.' | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message) | |
elif callback_query: | |
if callback_data.startswith('doc::') : | |
doc_ref = callback_data.split('::') | |
doctype_name = doc_ref[1] | |
doc_name = doc_ref[2] | |
print_format = doc_ref[3] | |
reply_message = doctype_name +' : '+doc_name | |
frappe.call('mke.custom_api.enqueue_telegram_send_doc_pdf', reciepient_chat_ids=[chat_id], caption=reply_message, doctype=doctype_name, doc_name=doc_name, print_format=print_format, print_letterhead=True, enqueue_after_commit=False, message_id=message_id) | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption='Please wait for PDF of:\n'+reply_message, message_id=message_id) | |
elif callback_data.startswith("emp::"): | |
chat_employee = frappe.db.get_value('Employee', {'telegram_chat_id': chat_id}, ['name', 'employee_name','company', 'designation', 'department'], as_dict=True) | |
if chat_employee : | |
if callback_data == "emp::attendance_report": | |
reply_markup = { | |
"inline_keyboard": [ | |
[{"text": "This Month's Attendance", "callback_data": "emp::this_months_attendance"}], | |
[{"text": "Last Month's Attendance", "callback_data": "emp::last_months_attendance"}], | |
[{"text": "« Main Menu", "callback_data": "employee"}] | |
]} | |
reply_message = 'Select Report Period' | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, reply_markup=reply_markup, message_id=message_id) | |
elif callback_data == "emp::this_months_attendance" or callback_data == "emp::last_months_attendance": | |
month_date = frappe.utils.getdate() if callback_data == "emp::this_months_attendance" else frappe.utils.add_to_date(frappe.utils.getdate(), months=-1) | |
date_from = frappe.utils.get_first_day(month_date) | |
date_to = frappe.utils.get_last_day(month_date) | |
attendance_report = frappe.call("frappe.desk.query_report.run", report_name="Attendance Report", filters={"employee":chat_employee.name, "date_from":date_from, "date_to":date_to}) | |
report_formatted = ['<b>Attendance Report</b>'] | |
for row in attendance_report['report_summary'] : | |
report_formatted.append(row['label']+' : '+str(row['value'])) | |
report_formatted.append('--------') | |
for row in attendance_report['result'] : | |
date = frappe.utils.format_date(row['attendance_date'], 'dd MMM') | |
status = row['status'] | |
#in_out = ((row['in_time'] if row['in_time'] else '--') +' → ' + (row['out_time'] if row['out_time'] else '--')) if row['out_time'] and row['in_time'] else None | |
late_early = ' '.join(x for x in ['LE' if row['late_entry'] else None, 'EE' if row['early_exit'] else None] if x) | |
report_formatted.append(date+ ((' | '+status) if status else '')+((' | '+late_early) if late_early else '')) | |
reply_message = '\n'.join(report_formatted) | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, message_id=message_id) | |
elif callback_data == "emp::leave_balance": | |
today = frappe.utils.getdate() | |
date_from = frappe.utils.get_year_start(today) | |
date_to = today | |
attendance_report = frappe.call("frappe.desk.query_report.run", report_name="Employee Leave Balance", filters={"employee":chat_employee.name, "from_date":date_from, "to_date":date_to, "company":chat_employee.company}) | |
report_formatted = ['<b>Leave Balance</b>', '--------', ''] | |
for row in attendance_report['result'] : | |
if row['leaves_allocated'] or row['leaves_taken'] : | |
report_formatted.append('<b>'+row['leave_type']+' :</b> '+str(row['closing_balance'])) | |
reply_message = '\n'.join(report_formatted) | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, message_id=message_id) | |
elif callback_data == "emp::profile": | |
reply_message = "Employee Name: "+chat_employee.employee_name+"\nJoining Date: 13-08-2023\nDesignation: "+chat_employee.designation+"\nDepartment: "+chat_employee.department | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, message_id=message_id) | |
elif callback_data == "emp::salary": | |
salary_slips = frappe.db.get_list("Salary Slip", filters={"employee" : chat_employee.name},fields=["name", "start_date"], order_by='start_date desc', page_length=5) | |
if len(salary_slips) > 0: | |
keyboard_data = [] | |
for ss in salary_slips : | |
ss_keyboard = [{"text": frappe.utils.format_date(ss.start_date, 'MMMM yyyy'), "callback_data": "doc::Salary Slip::"+ss.name+"::Salary Slip Standard"}] | |
keyboard_data.append(ss_keyboard) | |
reply_markup = { | |
"inline_keyboard": keyboard_data | |
} | |
reply_message = 'Select your salary slip' | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, reply_markup=reply_markup, message_id=message_id) | |
else: | |
reply_message = 'No Salary Slip is found for employee '+chat_employee.name+ ' : '+chat_employee.employee_name+'\n\nContact HR for further assistance.' | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, message_id=message_id) | |
else: | |
reply_message = 'This service is not available yet' | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message, message_id=message_id) | |
else: | |
reply_message = 'This telegram account is not linked with any employee profile.\n\nFor assistance, please contact HR.' | |
frappe.call('mke.custom_api.send_telegram_message', reciepient_chat_ids=[chat_id], caption=reply_message) | |
except Exception as e: | |
frappe.log_error(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment