All checks were successful
Build and push / changes (push) Successful in 51s
Build and push / Lint-Python (push) Successful in 4s
Build and push / Build-and-Push-Docker (push) Successful in 2m47s
Build and push / sync-argocd-app (push) Successful in 3s
Build and push / post-status-to-discord (push) Successful in 5s
184 lines
5.6 KiB
Python
Executable File
184 lines
5.6 KiB
Python
Executable File
#!/usr/local/bin/python
|
||
from discord.ext import commands
|
||
import core_utils
|
||
import discord
|
||
import os
|
||
import requests
|
||
import requests_cache
|
||
|
||
requests_cache.install_cache(
|
||
"request-cache", backend="sqlite", expire_after=300, allowable_methods=("GET",)
|
||
)
|
||
|
||
intents = discord.Intents.default()
|
||
intents.message_content = True
|
||
intents.members = True
|
||
intents.reactions = True
|
||
bot = commands.Bot(intents=intents)
|
||
|
||
cogfiles = [
|
||
f"cogs.{filename[:-3]}"
|
||
for filename in os.listdir("/app/cogs/")
|
||
if filename.endswith(".py")
|
||
]
|
||
|
||
for cogfile in cogfiles:
|
||
bot.load_extension(cogfile)
|
||
|
||
welcome_blurb = requests.get(core_utils.json_endpoint + "roles.json").json()
|
||
|
||
|
||
@bot.event
|
||
async def on_ready():
|
||
print(f"{bot.user.name} has connected to Discord!")
|
||
|
||
if os.getenv("DRAGON_ENV") == "prod":
|
||
# await bot.get_channel(152921472304676865).send("I have reconnected")
|
||
|
||
# Waiting-room in zoid's server
|
||
channel_id = 1026281775984549958
|
||
message_id = 1099374735428681758
|
||
|
||
# Testing
|
||
# blap
|
||
# channel_id = 932476007439552522
|
||
# message_id = 1236011522572943382
|
||
|
||
# message = await bot.get_channel(channel_id).send(blurb)
|
||
# Update the message on_ready to match the content we always want to be there
|
||
message = await bot.get_channel(channel_id).fetch_message(message_id)
|
||
await message.edit(content=welcome_blurb["welcome_message"])
|
||
|
||
|
||
@bot.event
|
||
async def on_raw_reaction_add(payload):
|
||
guild = bot.get_guild(payload.guild_id)
|
||
|
||
role_map = welcome_blurb["role_map"]
|
||
|
||
if payload.channel_id == 1026281775984549958:
|
||
role = discord.utils.get(guild.roles, name=role_map[payload.emoji.name])
|
||
member = guild.get_member(payload.user_id)
|
||
await member.add_roles(role)
|
||
|
||
|
||
@bot.listen("on_message")
|
||
async def convert_heic_to_jpg(ctx):
|
||
from cmagick import cmagick
|
||
import time
|
||
import tempfile
|
||
|
||
if ctx.attachments:
|
||
for attachment in ctx.attachments:
|
||
if attachment.filename.lower().endswith(("heic", "tiff")):
|
||
source_file = "/tmp/source"
|
||
source_file, file_path = tempfile.mkstemp()
|
||
jpg_file = "/tmp/%s.jpg" % time.time()
|
||
await attachment.save(fp=file_path)
|
||
|
||
cmagick.convert(file_path, jpg_file)
|
||
|
||
try:
|
||
await ctx.delete()
|
||
except Exception:
|
||
pass
|
||
await ctx.channel.send(
|
||
"%s said:\n%s" % (ctx.author.mention, ctx.content),
|
||
file=discord.File(jpg_file),
|
||
)
|
||
os.remove(file_path)
|
||
return
|
||
|
||
|
||
@bot.listen("on_message")
|
||
async def fix_social_media_links(ctx):
|
||
headers = {"Cache-Control": "no-store"}
|
||
correct_domains = requests.get(
|
||
core_utils.json_endpoint + "social_media_domains.json", headers=headers
|
||
).json()
|
||
|
||
if ctx.author.id == bot.user.id:
|
||
return
|
||
|
||
if "instagram.com" in ctx.content and "reel" not in ctx.content:
|
||
return
|
||
|
||
for k in correct_domains.keys():
|
||
if ctx.content.startswith(k) or ctx.content.startswith("www.{k}.com"):
|
||
await ctx.channel.send(
|
||
"%s said:\n%s"
|
||
% (
|
||
ctx.author.mention,
|
||
ctx.content.replace(k, "https://%s.com" % correct_domains[k]).split(
|
||
"?"
|
||
)[0],
|
||
)
|
||
)
|
||
await ctx.delete()
|
||
return
|
||
|
||
|
||
@bot.event
|
||
async def on_message(ctx):
|
||
if str(bot.user.id) in ctx.content:
|
||
# if ctx.author.id == core_utils.my_id:
|
||
# pass
|
||
# elif ctx.guild.id not in core_utils.my_guilds:
|
||
# return
|
||
|
||
llm_rules = requests.get(core_utils.json_endpoint + "rules.json").json()
|
||
|
||
if ctx.author.id in llm_rules["disallowed_users"]:
|
||
responses = [
|
||
"You cant do that right now",
|
||
"You cant use this feature right now",
|
||
"You're on time out from this",
|
||
]
|
||
await ctx.respond(random.choice(responses))
|
||
return
|
||
|
||
url = f"http://{core_utils.ein_ip}:7869/api/chat"
|
||
instructions = llm_rules["prompt"]
|
||
|
||
payload = {
|
||
"messages": [
|
||
{
|
||
"role": "system",
|
||
"content": instructions,
|
||
},
|
||
{
|
||
"content": ctx.content.replace(str(bot.user.id), "").replace(
|
||
"<@> ", ""
|
||
),
|
||
"role": "user",
|
||
},
|
||
],
|
||
"options": {"num_ctx": 1999},
|
||
"model": llm_rules["model"],
|
||
"stream": False,
|
||
"stop": llm_rules["stop_tokens"],
|
||
# "max_tokens": 4096,
|
||
# "frequency_penalty": 0,
|
||
# "presence_penalty": 0,
|
||
# "temperature": 0.7,
|
||
# "top_p": 0.95,
|
||
}
|
||
headers = {"Content-Type": "application/json"}
|
||
|
||
try:
|
||
await ctx.channel.trigger_typing()
|
||
response = requests.post(url, json=payload, headers=headers)
|
||
answer = response.json()["message"]["content"]
|
||
|
||
if len(answer) > 2000:
|
||
await ctx.reply(answer[:2000].replace("<|end▁of▁sentence|>", ""))
|
||
await ctx.reply(answer[2000:].replace("<|end▁of▁sentence|>", ""))
|
||
else:
|
||
await ctx.reply(answer.replace("<|end▁of▁sentence|>", ""))
|
||
except Exception as e:
|
||
print(e)
|
||
await ctx.reply("Somethings wrong, maybe the LLM crashed")
|
||
|
||
|
||
bot.run(os.getenv("discord_token"))
|