All checks were successful
Build and push / changes (push) Successful in 3s
Build and push / Lint-Python (push) Successful in 1s
Build and push / Build-and-Push-Docker (push) Successful in 2m20s
Build and push / post-status-to-discord (push) Successful in 1s
Build and push / sync-argocd-app (push) Successful in 2s
183 lines
4.9 KiB
Python
Executable File
183 lines
4.9 KiB
Python
Executable File
import requests
|
|
import discord
|
|
import os
|
|
import json
|
|
import httpx
|
|
|
|
my_guilds = [826547484632678450, 152921472304676865]
|
|
my_id = 144986109804412928
|
|
ein_ip = "192.168.1.137"
|
|
|
|
json_endpoint = "http://192.168.1.200/"
|
|
if os.getenv("DRAGON_ENV") == "prod":
|
|
json_endpoint = "http://dragon-bot-json.dragon-bot.svc.cluster.local/"
|
|
|
|
|
|
def remove_between(text, string1, string2):
|
|
start_index = text.find(string1)
|
|
end_index = text.find(string2, start_index + len(string1))
|
|
|
|
if start_index != -1 and end_index != -1:
|
|
return text[:start_index] + text[end_index + len(string2) :]
|
|
else:
|
|
return text # Return original text if delimiters not found
|
|
|
|
|
|
def download_image(url, path=None):
|
|
request = requests.get(url)
|
|
suffix_list = [
|
|
"jpeg",
|
|
"jpg",
|
|
"png",
|
|
"tif",
|
|
"svg",
|
|
]
|
|
extension = request.headers["content-type"].split("/")[1]
|
|
|
|
if not path:
|
|
path = "/tmp/image.{}".format(extension)
|
|
|
|
if extension in suffix_list:
|
|
open(path, "wb").write(requests.get(url).content)
|
|
return path
|
|
return "Invalid image format"
|
|
|
|
|
|
def generate_embed(
|
|
embed_url=None,
|
|
embed_title=None,
|
|
embed_description=None,
|
|
embed_color=None,
|
|
author_name=None,
|
|
author_image=None,
|
|
):
|
|
"""
|
|
generate_embed(embed_url=None, embed_title=None, embed_description=None, embed_color=None)
|
|
|
|
Generates a discord embed object based on the URL passed in
|
|
Optionally, you can set the title and description text for the embed object.
|
|
"""
|
|
|
|
if not embed_description and embed_url:
|
|
embed_description = "[Direct Link]({})".format(embed_url)
|
|
|
|
if not embed_color:
|
|
embed_color = discord.Color.gold()
|
|
|
|
embed = discord.Embed(
|
|
title=embed_title, description=embed_description, color=embed_color, type="rich"
|
|
)
|
|
if embed_url:
|
|
embed.set_image(url=embed_url)
|
|
|
|
if author_image or author_name:
|
|
embed.set_author(name=author_name, icon_url=author_image)
|
|
|
|
return embed
|
|
|
|
|
|
def gen_username():
|
|
from bs4 import BeautifulSoup
|
|
|
|
url = "https://generatorfun.com/code/model/generatorcontent.php?recordtable=generator&recordkey=960&gen=Y&itemnumber=1&randomoption=undefined&genimage=Yes&geneditor=No&nsfw=undefined&keyword=undefined&searchfilter=&searchfilterexclude=&tone=Normal&prefix=None&randomai=undefined"
|
|
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64)"}
|
|
response = requests.get(url, headers=headers).text
|
|
soup = BeautifulSoup(response, "html.parser")
|
|
|
|
username = soup.find("div", {"id": "textblock0"}).find_next("p").text
|
|
return username
|
|
|
|
|
|
def waifu_pics(endpoint, nsfw=False):
|
|
pic_type = "nsfw" if nsfw else "sfw"
|
|
|
|
return requests.get("https://api.waifu.pics/%s/%s" % (pic_type, endpoint)).json()[
|
|
"url"
|
|
]
|
|
|
|
|
|
async def send_alert(self, channel, embed):
|
|
await self.bot.get_channel(channel).send(embed=embed)
|
|
|
|
|
|
def build_alert_embed(
|
|
color, thumbnail=None, author=None, image=None, details=None, link=None
|
|
):
|
|
embed = discord.Embed(description="-------", color=color, type="rich")
|
|
|
|
if thumbnail:
|
|
embed.set_thumbnail(url=thumbnail)
|
|
|
|
embed.set_author(name=author)
|
|
|
|
if image:
|
|
embed.set_image(url=image)
|
|
|
|
embed.add_field(name="Details", value=details, inline=True)
|
|
|
|
embed.add_field(
|
|
name="LINK",
|
|
value=link,
|
|
inline=False,
|
|
)
|
|
|
|
return embed
|
|
|
|
|
|
def write_incident_file(file_path, url, details):
|
|
info_dict = {
|
|
"incident_link": url,
|
|
"details": details,
|
|
}
|
|
with open(file_path, "w") as out_file:
|
|
out_file.write(json.dumps(info_dict))
|
|
|
|
|
|
async def send_to_llm(ctx, message):
|
|
llm_rules = requests.get(json_endpoint + "rules.json").json()
|
|
|
|
if ctx.author.id in llm_rules["disallowed_users"]:
|
|
responses = [
|
|
"You cant do that right now",
|
|
"You cant use this feature right now",
|
|
"You're on time out from this",
|
|
]
|
|
await ctx.respond(random.choice(responses))
|
|
return
|
|
|
|
url = f"http://{ein_ip}:7869/api/chat"
|
|
instructions = llm_rules["prompt"]
|
|
|
|
payload = {
|
|
"messages": [
|
|
{
|
|
"role": "system",
|
|
"content": instructions,
|
|
},
|
|
{
|
|
"content": message,
|
|
"role": "user",
|
|
},
|
|
],
|
|
"options": {"num_ctx": 1999},
|
|
"model": llm_rules["model"],
|
|
"stream": False,
|
|
"stop": llm_rules["stop_tokens"],
|
|
# "max_tokens": 4096,
|
|
# "frequency_penalty": 0,
|
|
# "presence_penalty": 0,
|
|
# "temperature": 0.7,
|
|
# "top_p": 0.95,
|
|
}
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
try:
|
|
client = httpx.AsyncClient()
|
|
response = await client.post(url, json=payload, headers=headers)
|
|
answer = response.json()["message"]["content"]
|
|
|
|
return remove_between(answer, "<think>", "</think>")
|
|
except Exception as e:
|
|
print(e)
|
|
return "Somethings wrong, maybe the LLM crashed"
|