Kurt Rhee
08/04/2021, 4:01 PMKevin Kho
08/04/2021, 4:05 PMKurt Rhee
08/04/2021, 4:06 PMKevin Kho
08/04/2021, 4:08 PM<http://cloud.prefect.io|cloud.prefect.io>
, you would create something like an EC2 instance, and then start server there, and then the UI would be <VM-IP>:8080
.Kurt Rhee
08/04/2021, 4:08 PMKevin Kho
08/04/2021, 4:13 PMKurt Rhee
08/04/2021, 5:31 PMKevin Kho
08/04/2021, 7:03 PMprefect backend server
?Kurt Rhee
08/04/2021, 7:06 PMKevin Kho
08/04/2021, 7:08 PMKurt Rhee
08/04/2021, 7:08 PMKevin Kho
08/04/2021, 7:09 PMKurt Rhee
08/04/2021, 7:09 PMKevin Kho
08/04/2021, 7:22 PMKurt Rhee
08/04/2021, 7:23 PMKevin Kho
08/05/2021, 8:57 PMKurt Rhee
08/05/2021, 8:58 PMKevin Kho
08/05/2021, 8:59 PMKurt Rhee
08/05/2021, 9:50 PMKevin Kho
08/05/2021, 9:58 PMKurt Rhee
08/05/2021, 11:08 PMKevin Kho
08/06/2021, 1:14 AMStorage
class for Prefect, it automatically serializes your file and puts it in the .prefect
folder. So something with the way you are registering is ruining that path. I think it might be a bit weird that your code is in the ___init___.py
, not sure why you would do that what is happening is the path to the flow in the .prefect
folder is not working well for you. So here are a couple of things you can do:
1. Use Local
storage explicitly. There are two arguments you can use. First is the directory
so you can control where the pickle is saved. Second, you can point it to a script by using `stored_as_script=True`and then supplying a path_to_file
, which points to the Python file.
2. After this, you can also use the LocalRun
to specify a working_dir
so that if your flow as other imports in the same directory, you can choose the directory to start the flow from.
How did you register this? Using the flow.register
in the init.py
?Kurt Rhee
08/06/2021, 2:04 PMKevin Kho
08/06/2021, 2:07 PMLocal()
storage will get this, serialize it, and then save it under your .prefect
folder. When you run this, can you find the file under flows
under .prefect
in the home directory?Kurt Rhee
08/06/2021, 2:32 PMKevin Kho
08/06/2021, 2:35 PMpython /path/to/file/__ _init__.py_
or are you in the same folder as the init and it;s just python __init __.py
?Kurt Rhee
08/06/2021, 2:37 PMflow.run_config = LocalRun()
Kevin Kho
08/06/2021, 2:38 PM.prefect
is in the home directory of your machineKurt Rhee
08/06/2021, 2:39 PMKevin Kho
08/06/2021, 2:42 PMKurt Rhee
08/06/2021, 2:42 PMKevin Kho
08/06/2021, 2:43 PMKurt Rhee
08/06/2021, 2:43 PMimport os
import json
import warnings
import pendulum
from datetime import datetime, timedelta
from prefect import Flow, Parameter
from prefect.schedules import IntervalSchedule
class ValentineEtl:
"""
Valentine ETL Class
"""
def __init__(self):
# --- original data period ---
self.start = '2019-09-11 00:00'
self.end = datetime.now().strftime('%Y-%m-%d %H:%M')
# --- get localized time ---
self.site_code = 'VLT1'
dirname = os.path.dirname(__file__)
filename = os.path.join(dirname, fr'../intake/config/{self.site_code}.json')
try:
self.config = json.load(open(filename))
except FileNotFoundError:
self.config = warnings.warn(
f"Could not find {filename} "
"Please utilize botany.intake.geocode "
"to create a config.json file for this project")
# --- ghi ---
from botany.etl.generic_ghi import (
extract_ghi, extract_ghi_tilt,
transform_ghi,
load_ghi, load_ghi_tilt
)
# --- Schedule ---
schedule = IntervalSchedule(
start_date=datetime.utcnow() + timedelta(seconds=1),
interval=timedelta(days=1)
)
# --- Flow ---
with Flow(
'Valentine_GHI',
schedule=schedule) as flow:
# --- Configurations ---
dag = ValentineEtl()
# --- Extracts ---
# tags
ghi_tags = [
'VLT1SMET001_Pyranometer_1_SR30_Irradiance', 'VLT1SMET002_Pyranometer_1_SR30_Irradiance',
'VLT1SMET003_Pyranometer_1_SR30_Irradiance', 'VLT1SMET004_Pyranometer_1_SR30_Irradiance']
ghi_tilt_tags = [
'VLT1SMET001_MVTiltAngle', 'VLT1SMET002_MVTiltAngle',
'VLT1SMET003_MVTiltAngle', 'VLT1SMET004_MVTiltAngle']
# extractions
ghi = dag.extract_ghi(dag, ghi_tags)
ghi_tilt = dag.extract_ghi_tilt(dag, ghi_tilt_tags)
# --- Transforms ---
ghi = dag.transform_ghi(dag, ghi, ghi_tilt)
# --- Loads ---
dag.load_ghi(dag, ghi)
dag.load_ghi_tilt(dag, ghi_tilt)
if __name__ == '__main__':
flow.register(project_name="APPA")
flow.run()
flow.visualize()
pass
Kevin Kho
08/06/2021, 2:45 PM# --- ghi ---
from botany.etl.generic_ghi import (
extract_ghi, extract_ghi_tilt,
transform_ghi,
load_ghi, load_ghi_tilt
)
LocalRun(working_dir="dir_above_botany")
to that these imports can work. Is botany Python module or just a collection of scripts?Kurt Rhee
08/06/2021, 2:51 PMKevin Kho
08/06/2021, 2:53 PMpip installed
so that it can be used in other projects on the same machine that are in different directoriesKurt Rhee
08/06/2021, 2:53 PMKevin Kho
08/06/2021, 2:56 PMbotany
, that import will work also. It’s just where the Python process starts, or you could install it as a module to make it available no matter where you run the script. The pip install
adds the project to the path so it can be importedKurt Rhee
08/06/2021, 2:59 PMif __name__ == '__main__':
flow.run_config = LocalRun(working_dir=r'../..')
flow.register(project_name="APPA")
flow.run()
flow.visualize()
pass
Kevin Kho
08/06/2021, 3:00 PMKurt Rhee
08/06/2021, 3:01 PMif __name__ == '__main__':
dirname = os.path.dirname(__file__)
filename = os.path.join(dirname, r'../..')
flow.run_config = LocalRun(working_dir=filename)
flow.register(project_name="APPA")
flow.run()
flow.visualize()
Kevin Kho
08/06/2021, 3:14 PMos.path.dirname(__file__)
give you though when you print it? It gives me nothingKurt Rhee
08/06/2021, 3:22 PMKevin Kho
08/06/2021, 3:23 PMKurt Rhee
08/06/2021, 3:31 PMKevin Kho
08/06/2021, 3:32 PMKurt Rhee
08/06/2021, 3:32 PMFlow URL: <http://localhost:8080/default/flow/cea14a36-26b1-4058-85bc-d9b792aafb08>
└── ID: f88652b7-44c5-4487-b66c-a4f16826c1d8
└── Project: APPA
└── Labels: ['sdhqragsolpy02']
[2021-08-06 08:19:06-0700] INFO - prefect.Pendleton_GHI | Waiting for next scheduled run at 2021-08-06T15:19:07.129768+00:00
[2021-08-06 08:19:07-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'Pendleton_GHI'
[2021-08-06 08:19:07-0700] INFO - prefect.TaskRunner | Task 'extract_ghi': Starting task run...
Current Tag: PND1SMET001_GHI
Current Tag: PND1SMET002_GHI
[2021-08-06 08:19:08-0700] INFO - prefect.TaskRunner | Task 'extract_ghi': Finished task run for task with final state: 'Success'
[2021-08-06 08:19:09-0700] INFO - prefect.TaskRunner | Task 'extract_ghi_tilt': Starting task run...
Current Tag: PND1SMET001_GHI_TILT
Current Tag: PND1SMET002_GHI_TILT
[2021-08-06 08:19:10-0700] INFO - prefect.TaskRunner | Task 'extract_ghi_tilt': Finished task run for task with final state: 'Success'
[2021-08-06 08:19:10-0700] INFO - prefect.TaskRunner | Task 'load_ghi_tilt': Starting task run...
[2021-08-06 08:19:10-0700] INFO - prefect.TaskRunner | Task 'load_ghi_tilt': Finished task run for task with final state: 'Success'
[2021-08-06 08:19:10-0700] INFO - prefect.TaskRunner | Task 'transform_ghi': Starting task run...
[2021-08-06 08:19:10-0700] INFO - prefect.TaskRunner | Task 'transform_ghi': Finished task run for task with final state: 'Success'
[2021-08-06 08:19:10-0700] INFO - prefect.TaskRunner | Task 'load_ghi': Starting task run...
[2021-08-06 08:19:10-0700] INFO - prefect.TaskRunner | Task 'load_ghi': Finished task run for task with final state: 'Success'
[2021-08-06 08:19:10-0700] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-08-06 08:19:10-0700] INFO - prefect.Pendleton_GHI | Waiting for next scheduled run at 2021-08-07T15:19:07.129768+00:00
Kevin Kho
08/06/2021, 3:33 PMKurt Rhee
08/06/2021, 3:33 PMKevin Kho
08/06/2021, 3:34 PMflow.run()
when you register? You might be running into issues with that. When you register, try having only the registration as the last line of your code.Kurt Rhee
08/06/2021, 3:36 PMKevin Kho
08/06/2021, 3:44 PMKurt Rhee
08/06/2021, 4:29 PM