Hey everyone, I wrote a pretty simple first flow -...
# ask-community
d
Hey everyone, I wrote a pretty simple first flow - getting data of an api, saving it, then parsing is (as another task) and saving the parsed data. My code is using some Env vars and then it goes to the part where I actually build the flow (both are in the same py file). I also want to package the whole thing in a docker - please see some snippets  (the important section is “run_remotely”  and “data_flow”):
Copy code
init_logger()
log = logging.getLogger(LOGGER_NAME)


def data_flow():
    <http://log.info|log.info>('Running the data api workflow')
    with Flow("data_to_parquet_flow", storage=Docker(registry_url="us-central1/something/xx",
                                                             dockerfile="./Dockerfile", stored_as_script=True,
                                                             path='path_to_the_current_file.py')) as flow:
        json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date, token=token)
        parse_jsons_to_parquet(files_prefix=json_bucket_prefix)
    return flow


def run_local():
    flow = data_flow()
    flow.run()


def run_remotely():
    flow = data_flow()
    client = prefect.Client(api_key=prefect_token)
    client.register(flow, project_name=PREFECT_PROJECT_NAME)


if __name__ == '__main__':
    try:
        start_date, end_date, token, prefect_token = get_workflow_env_vars()
        if prefect_token:
            run_remotely()
        else:
            run_local()
    except Exception as e:
        log.error(e, exc_info=True)
Now i’m running the script and most of the build goes as planned but then I get this error (after “init_logger()” runs as expected):
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 150, in <module>
flows = import_flow_from_script_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 63, in import_flow_from_script_check
flows.append(extract_flow_from_file(file_path=flow_file_path))
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/storage.py", line 104, in extract_flow_from_file
raise ValueError("No flows found in file.")
ValueError: No flows found in file.
This is my Dockerfile:
Copy code
FROM python:3.8-slim

WORKDIR /app

COPY ./requirements.txt /app/requirements.txt
RUN pip install -r requirements.txt

ENV PYTHONPATH /app/

COPY ./ /app/
Which is in the same hierarchy as the file I posted at the top. Anyone had this issue? Any help will be appreciated. Thanks!
👀 1
a
@Dekel R there are a couple of issues here: 1. Do you want to build a new updated version of the Docker image every time you register your flow and let Prefect handle building the image and pushing it to the container registry, or do you prefer to build it once on your own and copy the flow file into it? This is important to know before we can tell whether storing the flow as script makes sense in your use case. 2. Do your custom code dependencies and package requirements change frequently or is it rather stable? This can tell whether Docker storage is useful for you. Additionally to those questions, some things I noticed: a) you use python base image instead of an official Prefect base image - I believe it’s worth switching this to a Prefect base image - those are more lightweight (based on Python slim base image) and optimized to run Prefect flows b) when you use Docker with
stored_as_script=True
, you need to add the flow to the storage explicitly, as shown in this example on line 44 https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_no_build/docker_script_docker_run_local_image.py#L44
In general, I think that if you switch to a Prefect base image and remove those two lines, this should fix the issue for you:
Copy code
stored_as_script=True,                                                             path='path_to_the_current_file.py'
To understand the difference between pickle and script storage in Docker storage, you can compare those two examples: 1. Script: https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_no_build/docker_script_docker_run_local_image.py 2. Pickle: https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows/docker_pickle_docker_run_local_image.py
d
Thanks for the quick reply. First of all - I changes the docker image to the base Prefect one and deleted the rows you suggested to delete - and its working now - thanks!! And for the questions - 1. Yes. I will have a CI CD flow (not a prefect flow (-: ) that will update the docker version after each merge to “main”. 2. Yes, my dependencies may change frequently - and in general - we are planning on using prefect for all of our pipelines (potentially hundreds of those) so using docker will eliminate different compatibility issues and thats why I chose it. Do you think other strategy will fit better?
a
@Dekel R that’s great to hear! The reason I asked those questions was to determine what is changing more frequently: 1. Only your flow files 2. Both, your flow definitions, as well as custom modules and third party libraries. If #1 was true, it could be easier to switch to some other storage class such as one of Git storage classes (Github, Bitbucket, Gitlab, Codecommit, Git), or one of cloud storage classes (e.g. S3, GCS, Azure). But if #2 is true (and you confirmed it is true in your use case), then Docker storage with the pickled flow option indeed makes sense because all those custom modules that are changing frequently are baked into the same image as your flow file and the image can be built each time at registration.
d
I see, Great, thank you!! Now I made some changes to my code in order to make it simple. This is the current script -
Copy code
with Flow("data_to_parquet_flow", storage=Docker(registry_url="us-central1-XXX/project/directory/",
                                                         dockerfile="./Dockerfile")) as flow:
    start_date = Parameter('start_date', default='10-11-21')
    end_date = Parameter('end_date', default='10-11-21')
    json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date)
    parse_jsons_to_parquet(files_prefix=json_bucket_prefix)

client = prefect.Client()
client.register(flow, project_name=PREFECT_PROJECT_NAME)
I also use Prefect parameters now. I got 2 additional questions: 1. When running my current code locally it works just fine and finish the flow - but when running from Prefect cloud (run, add some parameters and start the job) I get the following error
Failed to load and execute Flow's environment: AttributeError("'NoneType' object has no attribute 'setup'")
Are you familiar with that issue? I cant seem to find any answer online. 2. I want to schedule this job on a daily basis and change the Parameters based on the time of each run (I’m basically trying to get data of the current day from an api at the end of this day) - so today start_date will be 16-11-21 and tomorrow 17-11-21. Whats the best way to do that with Prefect? Also - what happens when prefect miss a run (for any reason)? The behavior I’m expecting is a rerun with the exact same parameters (so if I missed 1 week Prefect will spawn 7 flows, each one with a different day as the date). Thanks again.
a
@Dekel R 1. Looks like something in your flow definition is not serializable with cloudpickle - since you currently use Docker storage with pickled flow inside, you need to make sure that all objects are serializable and that all custom modules are within the image. Do you have some task that returns a database connection or an HTTP client? 2. You shouldn’t use Parameter defaults for this. Parameter defaults are frozen at registration time, which would have the effect that you would load every day the data from 10-11 🙂 what you can do instead is having a separate task that returns a current date e.g. pendulum.today(), and then using Parameter to optionally override that default value. So this would be: if parameter_value is None: get the return value of that task that returns today (or yesterday’s date in your case).
and the parameter default would then be None or an empty string
Regarding:
if I missed 1 week Prefect will spawn 7 flows, each one with a different day as the date
you could have a look at the Scheduler docs: The scheduler will never create runs that were scheduled to start in the past. This means that you would then need to explicitly trigger 7 runs, each with a different parameter value for a start date. You can trigger those runs either manually from a UI or from Python code e.g. using the create_flow_run task
d
Hey, 1. My first task returns a string, the second one is just writing some files to a Google cloud storage. I do use an http client in the task and another Google cloud storage client - but I’m not returning them. 2. I’ll change the default parameters 3. Ok, is this feature on Prefect’s road map? Rerunning missed tasks with their original parameters.
a
1. Is this HTTP client defined globally and shared between tasks? This error looked like some serialization issue but hard to tell without seeing it. Can you share your flow definition or build a minimal example I could reproduce? 2. Nice 3. Do you mean a feature to backfill data based on past schedule intervals? If so, then no because this is not how the Scheduler Service works. But Prefect provides you with building blocks to backfill data as you wish. Taking the use case you provided:
Copy code
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run

project_name = "your_project_name"

with Flow("backfill-last-week") as flow:
    create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-11-2021"))
    create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-12-2021"))
    create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-13-2021"))
    create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-14-2021"))
    create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-15-2021"))
    create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-16-2021"))
    create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-17-2021"))
d
1. The http client and the google cloud storage client are not defined globally, each one of the tasks defines a client for itself. In the first task I generate a new http client and I get data from an API with it, then I use a google storage client to save the data - and I only return a string. The second task is actually opening a new google storage client, reads the data and writes it in parquet format. The only thing that gets passed between tasks is a string - “json_bucket_prefix”. I can only think about one thing that both of the tasks are using - and its my logger - do you think it might be the issue? I define a logger at the upper pat of this file and both of the flows are using it -
Copy code
import logging
from logger import init_logger
from prefect import Flow, Parameter
from prefect.storage import Docker
import os
import prefect

from config import LOGGER_NAME, PREFECT_PROJECT_NAME
from tasks.data_fetcher import fetch_data
from tasks.data_parser import parse_jsons_to_parquet


init_logger()
log = logging.getLogger(LOGGER_NAME)


<http://log.info|log.info>('Running the api workflow')
with Flow("data_to_parquet_flow", storage=Docker(registry_url="us-central1-XXX/project/directory/",
                                                         dockerfile="./Dockerfile")) as flow:
    start_date = Parameter('start_date', default='10-11-21')
    end_date = Parameter('end_date', default='10-11-21')
    json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date)
    parse_jsons_to_parquet(files_prefix=json_bucket_prefix)

client = prefect.Client()
client.register(flow, project_name=PREFECT_PROJECT_NAME)
The flow itself is a little complicated so its hard for me to share it, I hope the explanation I just gave is good enough. Scheduling wise - understood, we will probably implement this somehow in order to rerun more easily. Thanks
a
@Dekel R I think you’re right, the logger looks like it may be the culprit here. In general, you should move all logs into the tasks, e.g.
Copy code
<http://log.info|log.info>('Running the api workflow')
could be defined in the first task, if you need this log line. Basically, everything that is not inside a task will be interpreted and pickled at registration time. And regarding logger, to see your logs in Prefect UI, you should leverage the Prefect logger, you can get it from the context:
Copy code
import prefect 

@task
def your_task():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Log msg from a task")
d
Do I need to use a different logger for each task too? Or a single logger will work as long as the logs are only inside the tasks?
a
if you add this line to each of your tasks that require logging, this would reuse the same logger that prefect is using for the flow:
Copy code
logger = prefect.context.get("logger")
It doesn’t create a new logger object, it only grabs the already existing Prefect logger from the context.
d
Right now I’m running this code that initializing the logger -
Copy code
init_logger()
log = logging.getLogger(LOGGER_NAME)
In the rest of my files (inside the tasks) I’m only running this -
Copy code
log = logging.getLogger(LOGGER_NAME)
So do you think I need to now run “init_logger” in each one of my tasks (for different loggers) and then use that separate logger for each task? Please see the code of “init_logger” (nothing special)
Copy code
import logging
from config import LOGGER_NAME


def init_logger():
    logger = logging.getLogger(LOGGER_NAME)
    logger.setLevel(<http://logging.INFO|logging.INFO>)
    logger.addHandler(logging.StreamHandler())
    <http://logger.info|logger.info>(f'initialized logger {LOGGER_NAME}')
I’ll try that (logger = prefect.context.get(“logger”))
a
yeah, so I think you could just use this logger from the context and not use the init_logger at all. But if you need to configure the Prefect logger in some specific way, you could do that through environment variables or through config.toml, e.g.
Copy code
export PREFECT__LOGGING__LEVEL="DEBUG"
export PREFECT__LOGGING__FORMAT="%(levelname)s - %(name)s | %(message)s"
d
I did it but I keep getting the same error. I also have some “common” configurations that both of the tasks use (just a config file with urls and such stuff) is it an issue too? Does the tasks have to be completely isolated?
a
@Dekel R btw, many people use KV Store for backfilling purposes, e.g. you can store last loaded date as a key value pair and any new flow run grabs this value and loads data from there. Here is one example https://docs.prefect.io/orchestration/concepts/kv_store.html#using-key-value-pairs-in-flows
d
Nice, seems like a good solution
Do you have any idea of whats the best way to proceed with this issue? I keep getting this error.
a
So the logger didn’t fix it? Can you share your most recent flow definition?
d
Nope.. still the same error (runs great locally but not on Prefect cloud) This is the latest flow definition -
Copy code
from prefect import Flow, Parameter
from prefect.storage import Docker
import prefect
import os
from config import PREFECT_PROJECT_NAME
from tasks.data_fetcher import fetch_data
from tasks.data_parser import parse_jsons_to_parquet

with Flow("weather_data_to_parquet_flow", storage=Docker(registry_url="us-central1-XX/project_name4/data-pipelines/",
                                                         dockerfile="./Dockerfile")) as flow:
    start_date = Parameter('start_date')
    end_date = Parameter('end_date')
    json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date)
    parse_jsons_to_parquet(files_prefix=json_bucket_prefix)

# flow.run(parameters=dict(start_date=os.environ.get('START_DATE'),
#                          end_date=os.environ.get('END_DATE')))  # parameters=dict(start_date=os.environ.get('START_DATE'), end_date=os.environ.get('END_DATE'))


client = prefect.Client()
client.register(flow, project_name=PREFECT_PROJECT_NAME)
a
Using env variables here may cause trouble too:
Copy code
os.environ.get('START_DATE'),
you would need to make sure that those env variables exist both during registration and in the environment from which you register your flows, as well as from the execution environment. I think the best way to test it is to use a super simple basic flow. E.g.
Copy code
from prefect import task, Flow


@task(log_stdout=True)
def hello_world():
    print("hello world")


with Flow("weather_data_to_parquet_flow", storage=Docker(registry_url="us-central1-XX/project_name4/data-pipelines/",
                                                         dockerfile="./Dockerfile") as flow:
    hw = hello_world()
If this works, then you know that it’s something in the flow definition that doesn’t work. Btw, did you know that prefect has CLI to register flows? It’s much easier than using a client.