You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

107 lines
4.0 KiB

from discord.ext import tasks, commands
import json # Note -> need to see how well json scales here!
import os
import fortnite_api
import cogs
from datetime import datetime, timedelta
import asyncio
import utils
class FortniteStoreBot(commands.Bot):
def __init__(self, command_prefix, fortnite_api_token, **options):
super().__init__(command_prefix, **options)
self.fapi = fortnite_api.FortniteItemClient(fortnite_api_token)
self.add_cog(cogs.Store(self))
self.add_cog(cogs.MetaCommands(self))
self.load_extension("jishaku")
self.all_items = {}
self.store_today = {}
self.store_watch = []
self.item_watch = {}
async def start(self, *args, **kwargs):
await self.initialize_cache()
await self.load_watch()
self.refresh_cache_and_check_up.start()
await super().start(*args, **kwargs)
async def _sleep_until(self, hour=0, minute=0):
"""Wait until the specified hour and minute rolls around in UTC."""
curr = datetime.now()
nearest = datetime(curr.year, curr.month, curr.day, hour, minute)
if nearest < curr:
nearest += timedelta(days=1)
self.current_time = curr
self.wait_time = (nearest - curr).seconds
await asyncio.sleep((nearest - curr).seconds)
async def initialize_cache(self):
self.all_items = self.fapi.item_list
self.store_today = self.fapi.store
@tasks.loop(seconds=5.0) # Have d.py handle any and all network errors that could occur here.
async def refresh_cache_and_check_up(self):
await self._sleep_until(0, 20)
await self.initialize_cache()
for user_id in self.store_watch:
user = self.get_user(int(user_id))
if user:
for item in self.store_today:
try:
await user.send(embed=utils.generate_item_embed(item, True))
except:
pass # Eh
else:
self.store_watch.remove(user_id) # Quietly remove the user from store_watch if we can't find them.
await self.save_watch()
for item in self.store_today:
if item["itemId"] in self.item_watch:
item_id = str(item["itemId"])
for user_id in self.item_watch[item_id]:
user = self.get_user(int(user_id))
if user:
try:
await user.send("Hi! The item `{}` you asked me to keep an eye out for is now in the store!".format(item["item"]["name"]))
except:
pass
if not self.item_watch[item_id][user_id]["repeat"]:
del self.item_watch[item_id][user_id]
await self.save_watch()
else:
del self.item_watch[item_id][user_id] # Quietly remove the user from item_watch if we can't find them.
await self.save_watch()
async def save_watch(self):
json_dict = {
"store_watch": self.store_watch,
"item_watch": self.item_watch
}
with open("watch.json", "w") as watchfile:
json.dump(json_dict, watchfile)
async def load_watch(self):
if not os.path.isfile("watch.json"):
await self.save_watch()
with open("watch.json", "r") as watchfile:
json_dict = json.load(watchfile)
self.store_watch = json_dict["store_watch"]
self.item_watch = json_dict["item_watch"]
def load_config() -> dict:
if not os.path.isfile("config.json"):
print("Set up the config.json")
exit(1)
with open("config.json", "r") as configfile:
return json.load(configfile)
def main():
config = load_config()
client = FortniteStoreBot(command_prefix="f!", fortnite_api_token=config["fapi_token"])
client.run(config["bot_token"])
if __name__ == "__main__":
main()