dragon-bot/app/cogs/actual_utils.py

206 lines
6.5 KiB
Python

from discord.ext import commands
from discord import option
import discord
import os
import get_from_reddit
import core_utils
import requests
class ActualUtils(commands.Cog):
def __init__(self, bot):
self.bot: commands.Bot = bot
@commands.slash_command(
guild_ids=None,
name="youtube",
description="Search youtube for the passed in query",
)
@option(
"query",
description="The search string you want to enter on youtube",
input_type="str",
requred=True,
)
async def youtube(self, ctx, query: str):
import re
from urllib import parse, request
query_string = parse.urlencode({"search_query": query})
html_content = request.urlopen("http://www.youtube.com/results?" + query_string)
search_results = re.findall("\/watch\?v=(.{11})", html_content.read().decode())
result = "https://www.youtube.com/watch?v=" + search_results[0]
await ctx.defer()
await ctx.followup.send(result)
@commands.command(name="issue")
async def issue(self, ctx: commands.Context):
import gitlab
await ctx.send(gitlab.parse_message(ctx.message))
@commands.command(name="tts")
async def tts(self, ctx: commands.Context):
import tts
if ctx.message.content.split()[1] == "langs":
await ctx.send("Ok {}, check your DMs".format(ctx.message.author.mention))
return await ctx.message.author.send(tts.get_all_langs())
file_path = tts.text_to_speech(ctx.message.content)
await ctx.send(
file=discord.File(
file_path,
filename="A Message From {}.mp3".format(ctx.message.author.name),
)
)
await ctx.message.delete()
os.remove(file_path)
@commands.command(name="ask", aliases=["wolfram"])
async def ask(self, ctx: commands.Context, *, query):
import questions
await ctx.reply(
questions.answer_question(query),
)
@commands.command(name="openai")
async def openai(self, ctx: commands.Context, *, query):
if ctx.message.author.id != core_utils.my_id:
await ctx.send("Sorry, this is a paid dale-bot feature")
return
import questions
await ctx.reply(
questions.open_ai(query),
)
@commands.has_role("stable-diffuser")
@commands.slash_command(
guild_ids=core_utils.my_guilds,
name="sd",
description="Pass a prompt and optinal negative prompt to stable diffusion",
)
@option(
"positive_prompt",
description="The positive prompt to pass to stable diffusion",
input_type="str",
requred=True,
)
@option(
"negative_prompt",
description="An optional set of negatives you want to pass to stable diffusion",
requred=False,
)
async def sd(
self, ctx: commands.Context, positive_prompt: str, negative_prompt: str
):
if ctx.channel.name == "stable-diffusion":
import socket
import stable_diffusion
port = "7860"
ip = "192.168.1.80"
steps = 20
# Send my requests to my gaming computer with the 3080 (if its up)
if ctx.author.id == core_utils.my_id:
ip = "192.168.1.188"
steps = 60
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
try:
s.connect((ip, int(port)))
except:
ip = "192.168.1.80"
try:
await ctx.defer()
original_message = await ctx.followup.send(
"Please be patient, I'm generating your image"
)
file_path, time_taken = await stable_diffusion.generate_image(
ip=ip,
port=port,
positives=positive_prompt,
negatives=negative_prompt or None,
steps=steps,
)
await original_message.edit(
content=time_taken,
file=discord.File(
file_path,
filename="unknown.png",
),
)
os.remove(file_path)
except Exception as e:
await ctx.reply(
"Stable diffusion isnt running right now, sorry.\n%s" % e,
)
else:
await ctx.respond("You can only do that in the stable-diffusion channel")
@commands.has_role("Track day gamers")
@commands.slash_command(
guild_ids=core_utils.my_guilds,
name="trackdays",
description="Query motorsportsreg.com for a list of trackdays going on at Buttonwillow and Thunderhill",
)
async def trackdays(self, ctx: commands.Context):
import trackdays
shid = await trackdays.get_msreg()
for track, events in shid.items():
embed = discord.Embed(
description=":checkered_flag: **Upcoming events at %s**:checkered_flag: "
% track,
color=0x428BCA,
type="rich",
)
for track_day in events:
embed.add_field(
name="-------------\n**Event Name**",
value=track_day["event_name"],
inline=False,
)
embed.add_field(
name="Date", value=track_day["event_date"], inline=False
)
embed.add_field(
name="Event URL", value=track_day["event_url"], inline=False
)
await ctx.send(embed=embed)
@commands.command(name="corona", aliases=["covid"])
async def corona(self, ctx: commands.Context, *, location=None):
import corona
async with ctx.message.channel.typing():
result = corona.parse_message(location)
await ctx.send(embed=result)
@commands.command(name="stock")
async def stock(self, ctx: commands.Context):
msg = ctx.message.content
if len(msg.split()) < 2:
import help_methods
await ctx.send(help_methods.get_help_message("stock"))
import stock
results = stock.parse_message(msg)
for res in results:
await ctx.reply(embed=res)
def setup(bot):
bot.add_cog(ActualUtils(bot))