Escape From Tarkov Craft Profit Counter
About
Pipeline in this example is inspired by game Escape from Tarkov. It is a realistic FPS game. Beside shooting enemies, Players can earn and sell items in a game market which is driven by players themselves. The price of each item is changing in real-time based on Demand-supply mechanics. Another important game mechanic is that players can create the items themself in their specific stations. Items created and required for each crafts can be bought on the market, so players can earn in-game money by producing the items. Because price of each item is not stable, some crafts are more profitable than others. My idea was to take data from an API source that gives information of all available crafts players can do together with price of each item. I will use this data to sort and analyze the data and output them in form that might help players know which crafts is more profitable and suitable.
In this example I will show you a process of creating pipeline with a bit more complicated use. You will learn about creating a source that enables us to use query in our API requests.
Source
First we have to create our source to pump the data to the pipeline. We will be using aiohttp library for our custom source. We will start by creating our source class. As you can see in the code below.
class IOHTTPSource(bspump.TriggerSource):
def __init__(self, app, pipeline, choice=None, id=None, config=None):
super().__init__(app, pipeline, id=id, config=config)
async def cycle(self):
async with aiohttp.ClientSession() as session:
async with session.post('https://tarkov-tools.com/graphql', json={'query': query}) as response:
if response.status == 200:
event = await response.json()
else:
raise Exception("Query failed to run by returning code of {}. {}".format(response.status, query))
await self.process(event)
As you can see in the cycle method. We are using asynchronous functions for the API requests. As you can see in the code I am creating Session which is used in aiohttp for more information check AIOHTTP Documentation. I am using post method with a query parameter as seen below.
query = """
query {
crafts {
source
duration
rewardItems {
quantity
item {
shortName
lastLowPrice
}
}
requiredItems {
quantity
item {
shortName
lastLowPrice
}
}
}
}
"""
I created this query using playground interface made by the API authors. Here is the link if you would like to use this API.
Now you can try to copy-paste the code below and try it for yourself.
#!/usr/bin/env python3
import aiohttp
import bspump
import bspump.common
import bspump.http
import bspump.trigger
import pandas as pd
import bspump.file
query = """
query {
crafts {
source
duration
rewardItems {
quantity
item {
shortName
lastLowPrice
}
}
requiredItems {
quantity
item {
shortName
lastLowPrice
}
}
}
}
"""
class IOHTTPSource(bspump.TriggerSource):
def __init__(self, app, pipeline, choice=None, id=None, config=None):
super().__init__(app, pipeline, id=id, config=config)
async def cycle(self):
async with aiohttp.ClientSession() as session:
async with session.post('https://tarkov-tools.com/graphql', json={'query': query}) as response:
if response.status == 200:
event = await response.json()
else:
raise Exception("Query failed to run by returning code of {}. {}".format(response.status, query))
await self.process(event)
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
IOHTTPSource(app, self).on(bspump.trigger.PeriodicTrigger(app, 5)),
bspump.common.PPrintProcessor(app,self),
bspump.common.NullSink(app, self),
)
If everything works correctly, you should be getting similar output.
'source': 'Workbench level 3'},
{'duration': 60000,
'requiredItems': [{'item': {'lastLowPrice': 39000,
'shortName': 'Eagle'},
'quantity': 2},
{'item': {'lastLowPrice': 15000,
'shortName': 'Kite'},
'quantity': 2}],
'rewardItems': [{'item': {'lastLowPrice': None,
'shortName': 'BP'},
'quantity': 120}],
'source': 'Workbench level 3'},
{'duration': 61270,
'requiredItems': [{'item': {'lastLowPrice': 15000,
'shortName': 'Kite'},
'quantity': 2},
{'item': {'lastLowPrice': 39000,
'shortName': 'Eagle'},
'quantity': 2},
{'item': {'lastLowPrice': 31111,
'shortName': 'Hawk'},
'quantity': 2}],
'rewardItems': [{'item': {'lastLowPrice': None,
'shortName': 'PPBS'},
'quantity': 150}],
.
.
.
There are probably hundreds of JSON lines in your console right now. It is not a nice way to output your data right? Let’s implement our filter processor then.
Filter Processor
This filter processor is used for very specific use-case in this example. The goal as you can remember was to filter incoming data. The goal is to create a dataframe that contains data where each row has information about station in which the craft is created, duration of the craft ,price of items needed to perform the craft, name and price of item/s that we obtain by the craft, profit of the craft, and profit per hour. As you can see there is a lot of indexes we have to create.
class FilterByStation(bspump.Processor):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id=None, config=None)
def process(self, context, event):
my_columns = ['station', 'name', 'output_price_item', 'duration', 'input_price_item', 'profit', 'profit_per_hour']
df = pd.DataFrame(columns=my_columns)
for item in event["data"]["crafts"]:
duration = round((item["duration"])/60/60, ndigits=3)
reward = item["rewardItems"][0]
name_output = reward["item"]["shortName"]
quantity = reward["quantity"]
output_item_price = reward["item"]["lastLowPrice"]
if output_item_price is None: # checks for NULL values
output_item_price = 0
output_price_item = quantity * int(output_item_price)
station_name = item["source"]
profit = 0
profit_p_hour = 0
input_price_item = 0
for item2 in range(len(item["requiredItems"])):
required_item = item["requiredItems"][item2]
quantity_i = required_item["quantity"]
input_item = required_item["item"]["lastLowPrice"]
if input_item is None:
input_item = 0
price_of_input_item = input_item * quantity_i
input_price_item = input_price_item + price_of_input_item
profit = output_price_item - input_price_item
profit_p_hour = round(profit / duration, ndigits=3)
df = df.append(
pd.Series([station_name,
name_output,
output_price_item,
duration,
input_price_item,
profit,
profit_p_hour],
index=my_columns), ignore_index=True)
event = df
return event
You can copy-paste the code above and everything should work just fine. Don’t forget to reference the processor in the self.build() method.
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
IOHTTPSource(app, self).on(bspump.trigger.PeriodicTrigger(app, 5)),
FilterByStation(app, self),
bspump.common.PPrintProcessor(app, self),
bspump.common.NullSink(app, self),
)
If you want more detail of what it does. It firstly goes through the whole json, then it gets data for each of the index if possible (otherwise zero is used instead of null), and appends the record as a row in our dataframe. I am using Pandas in this example. If you are not familiar with Pandas make sure you checked their Documentation
Now output in your console should like like this:
station name output_price_item duration input_price_item profit profit_per_hour
0 Booze generator level 1 Moonshine 286999 3.056 236998 50001 16361.584
1 Intelligence Center level 2 Flash drive 180000 34.222 151498 28502 832.856
2 Intelligence Center level 2 Virtex 88000 37.611 210993 -122993 -3270.134
3 Intelligence Center level 2 SG-C10 130000 38.889 206978 -76978 -1979.429
4 Intelligence Center level 2 RFIDR 215000 53.333 40000 175000 3281.271
.. ... ... ... ... ... ... ...
128 Workbench level 3 PBP 0 11.972 265888 -265888 -22209.155
129 Workbench level 3 M995 0 15.994 211000 -211000 -13192.447
130 Workbench level 3 M61 0 16.644 233331 -233331 -14018.926
131 Workbench level 3 BP 0 16.667 108000 -108000 -6479.870
132 Workbench level 3 PPBS 0 17.019 170222 -170222 -10001.880
[133 rows x 7 columns]
We can agree that this looks much more better than raw JSON, but this is not the end we still need to send the data somewhere for out bot
Dataframe to csv Processor
To make the data available for our Discord bot, we will save them to a directory as a csv file. This processor is really simple as we call only one function from the Pandas library.
You can copy paste the code of the processor
class DataFrameToCSV(bspump.Processor):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id=None, config=None)
def process(self, context, event):
event.to_csv('./Data/TarkovData.csv', index=False)
return event
Once again dont forget to include the processor in our self.build() method.
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
IOHTTPSource(app, self).on(bspump.trigger.PeriodicTrigger(app, 5)),
FilterByStation(app, self),
bspump.common.PPrintProcessor(app, self),
DataFrameToCSV(app, self),
bspump.common.NullSink(app, self),
)
This wont change our output in console, but it should create a csv file in your current directory.
What next
Now we have a function pipeline. You can do anything with the output data. For example, I created a simple discord bot that sends a message with the updated data you can try to make your own discord bot using this tutorial: Getting Started with Discord Bots.