Dekel R
11/16/2021, 2:32 PMinit_logger()
log = logging.getLogger(LOGGER_NAME)
def data_flow():
<http://log.info|log.info>('Running the data api workflow')
with Flow("data_to_parquet_flow", storage=Docker(registry_url="us-central1/something/xx",
dockerfile="./Dockerfile", stored_as_script=True,
path='path_to_the_current_file.py')) as flow:
json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date, token=token)
parse_jsons_to_parquet(files_prefix=json_bucket_prefix)
return flow
def run_local():
flow = data_flow()
flow.run()
def run_remotely():
flow = data_flow()
client = prefect.Client(api_key=prefect_token)
client.register(flow, project_name=PREFECT_PROJECT_NAME)
if __name__ == '__main__':
try:
start_date, end_date, token, prefect_token = get_workflow_env_vars()
if prefect_token:
run_remotely()
else:
run_local()
except Exception as e:
log.error(e, exc_info=True)
Now i’m running the script and most of the build goes as planned but then I get this error (after “init_logger()” runs as expected):
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 150, in <module>
flows = import_flow_from_script_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 63, in import_flow_from_script_check
flows.append(extract_flow_from_file(file_path=flow_file_path))
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/storage.py", line 104, in extract_flow_from_file
raise ValueError("No flows found in file.")
ValueError: No flows found in file.
This is my Dockerfile:
FROM python:3.8-slim
WORKDIR /app
COPY ./requirements.txt /app/requirements.txt
RUN pip install -r requirements.txt
ENV PYTHONPATH /app/
COPY ./ /app/
Which is in the same hierarchy as the file I posted at the top.
Anyone had this issue? Any help will be appreciated.
Thanks!Anna Geller
stored_as_script=True
, you need to add the flow to the storage explicitly, as shown in this example on line 44 https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_no_build/docker_script_docker_run_local_image.py#L44Anna Geller
stored_as_script=True, path='path_to_the_current_file.py'
To understand the difference between pickle and script storage in Docker storage, you can compare those two examples:
1. Script: https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_no_build/docker_script_docker_run_local_image.py
2. Pickle: https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows/docker_pickle_docker_run_local_image.pyDekel R
11/16/2021, 4:05 PMAnna Geller
Dekel R
11/17/2021, 1:26 PMwith Flow("data_to_parquet_flow", storage=Docker(registry_url="us-central1-XXX/project/directory/",
dockerfile="./Dockerfile")) as flow:
start_date = Parameter('start_date', default='10-11-21')
end_date = Parameter('end_date', default='10-11-21')
json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date)
parse_jsons_to_parquet(files_prefix=json_bucket_prefix)
client = prefect.Client()
client.register(flow, project_name=PREFECT_PROJECT_NAME)
I also use Prefect parameters now.
I got 2 additional questions:
1. When running my current code locally it works just fine and finish the flow - but when running from Prefect cloud (run, add some parameters and start the job) I get the following error
Failed to load and execute Flow's environment: AttributeError("'NoneType' object has no attribute 'setup'")
Are you familiar with that issue? I cant seem to find any answer online.
2. I want to schedule this job on a daily basis and change the Parameters based on the time of each run (I’m basically trying to get data of the current day from an api at the end of this day) - so today start_date will be 16-11-21 and tomorrow 17-11-21. Whats the best way to do that with Prefect? Also - what happens when prefect miss a run (for any reason)? The behavior I’m expecting is a rerun with the exact same parameters (so if I missed 1 week Prefect will spawn 7 flows, each one with a different day as the date).
Thanks again.Anna Geller
Anna Geller
Anna Geller
if I missed 1 week Prefect will spawn 7 flows, each one with a different day as the dateyou could have a look at the Scheduler docs: The scheduler will never create runs that were scheduled to start in the past. This means that you would then need to explicitly trigger 7 runs, each with a different parameter value for a start date. You can trigger those runs either manually from a UI or from Python code e.g. using the create_flow_run task
Dekel R
11/17/2021, 2:04 PMAnna Geller
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run
project_name = "your_project_name"
with Flow("backfill-last-week") as flow:
create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-11-2021"))
create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-12-2021"))
create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-13-2021"))
create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-14-2021"))
create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-15-2021"))
create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-16-2021"))
create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-17-2021"))
Dekel R
11/17/2021, 2:35 PMimport logging
from logger import init_logger
from prefect import Flow, Parameter
from prefect.storage import Docker
import os
import prefect
from config import LOGGER_NAME, PREFECT_PROJECT_NAME
from tasks.data_fetcher import fetch_data
from tasks.data_parser import parse_jsons_to_parquet
init_logger()
log = logging.getLogger(LOGGER_NAME)
<http://log.info|log.info>('Running the api workflow')
with Flow("data_to_parquet_flow", storage=Docker(registry_url="us-central1-XXX/project/directory/",
dockerfile="./Dockerfile")) as flow:
start_date = Parameter('start_date', default='10-11-21')
end_date = Parameter('end_date', default='10-11-21')
json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date)
parse_jsons_to_parquet(files_prefix=json_bucket_prefix)
client = prefect.Client()
client.register(flow, project_name=PREFECT_PROJECT_NAME)
The flow itself is a little complicated so its hard for me to share it, I hope the explanation I just gave is good enough.
Scheduling wise - understood, we will probably implement this somehow in order to rerun more easily.
ThanksAnna Geller
<http://log.info|log.info>('Running the api workflow')
could be defined in the first task, if you need this log line.
Basically, everything that is not inside a task will be interpreted and pickled at registration time.
And regarding logger, to see your logs in Prefect UI, you should leverage the Prefect logger, you can get it from the context:
import prefect
@task
def your_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Log msg from a task")
Dekel R
11/17/2021, 2:50 PMAnna Geller
logger = prefect.context.get("logger")
It doesn’t create a new logger object, it only grabs the already existing Prefect logger from the context.Dekel R
11/17/2021, 2:54 PMinit_logger()
log = logging.getLogger(LOGGER_NAME)
In the rest of my files (inside the tasks) I’m only running this -
log = logging.getLogger(LOGGER_NAME)
So do you think I need to now run “init_logger” in each one of my tasks (for different loggers) and then use that separate logger for each task?
Please see the code of “init_logger” (nothing special)
import logging
from config import LOGGER_NAME
def init_logger():
logger = logging.getLogger(LOGGER_NAME)
logger.setLevel(<http://logging.INFO|logging.INFO>)
logger.addHandler(logging.StreamHandler())
<http://logger.info|logger.info>(f'initialized logger {LOGGER_NAME}')
Dekel R
11/17/2021, 2:55 PMAnna Geller
export PREFECT__LOGGING__LEVEL="DEBUG"
export PREFECT__LOGGING__FORMAT="%(levelname)s - %(name)s | %(message)s"
Dekel R
11/17/2021, 3:20 PMAnna Geller
Dekel R
11/17/2021, 3:30 PMDekel R
11/17/2021, 3:58 PMAnna Geller
Dekel R
11/17/2021, 4:03 PMfrom prefect import Flow, Parameter
from prefect.storage import Docker
import prefect
import os
from config import PREFECT_PROJECT_NAME
from tasks.data_fetcher import fetch_data
from tasks.data_parser import parse_jsons_to_parquet
with Flow("weather_data_to_parquet_flow", storage=Docker(registry_url="us-central1-XX/project_name4/data-pipelines/",
dockerfile="./Dockerfile")) as flow:
start_date = Parameter('start_date')
end_date = Parameter('end_date')
json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date)
parse_jsons_to_parquet(files_prefix=json_bucket_prefix)
# flow.run(parameters=dict(start_date=os.environ.get('START_DATE'),
# end_date=os.environ.get('END_DATE'))) # parameters=dict(start_date=os.environ.get('START_DATE'), end_date=os.environ.get('END_DATE'))
client = prefect.Client()
client.register(flow, project_name=PREFECT_PROJECT_NAME)
Anna Geller
os.environ.get('START_DATE'),
you would need to make sure that those env variables exist both during registration and in the environment from which you register your flows, as well as from the execution environment.
I think the best way to test it is to use a super simple basic flow. E.g.
from prefect import task, Flow
@task(log_stdout=True)
def hello_world():
print("hello world")
with Flow("weather_data_to_parquet_flow", storage=Docker(registry_url="us-central1-XX/project_name4/data-pipelines/",
dockerfile="./Dockerfile") as flow:
hw = hello_world()
If this works, then you know that it’s something in the flow definition that doesn’t work.
Btw, did you know that prefect has CLI to register flows? It’s much easier than using a client.