Thread
#prefect-community
    Dekel R

    Dekel R

    10 months ago
    Hey everyone, I wrote a pretty simple first flow - getting data of an api, saving it, then parsing is (as another task) and saving the parsed data. My code is using some Env vars and then it goes to the part where I actually build the flow (both are in the same py file). I also want to package the whole thing in a docker - please see some snippets  (the important section is “run_remotely”  and “data_flow”):
    init_logger()
    log = logging.getLogger(LOGGER_NAME)
    
    
    def data_flow():
        <http://log.info|log.info>('Running the data api workflow')
        with Flow("data_to_parquet_flow", storage=Docker(registry_url="us-central1/something/xx",
                                                                 dockerfile="./Dockerfile", stored_as_script=True,
                                                                 path='path_to_the_current_file.py')) as flow:
            json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date, token=token)
            parse_jsons_to_parquet(files_prefix=json_bucket_prefix)
        return flow
    
    
    def run_local():
        flow = data_flow()
        flow.run()
    
    
    def run_remotely():
        flow = data_flow()
        client = prefect.Client(api_key=prefect_token)
        client.register(flow, project_name=PREFECT_PROJECT_NAME)
    
    
    if __name__ == '__main__':
        try:
            start_date, end_date, token, prefect_token = get_workflow_env_vars()
            if prefect_token:
                run_remotely()
            else:
                run_local()
        except Exception as e:
            log.error(e, exc_info=True)
    Now i’m running the script and most of the build goes as planned but then I get this error (after “init_logger()” runs as expected):
    Traceback (most recent call last):
    File "/opt/prefect/healthcheck.py", line 150, in <module>
    flows = import_flow_from_script_check(flow_file_paths)
    File "/opt/prefect/healthcheck.py", line 63, in import_flow_from_script_check
    flows.append(extract_flow_from_file(file_path=flow_file_path))
    File "/usr/local/lib/python3.8/site-packages/prefect/utilities/storage.py", line 104, in extract_flow_from_file
    raise ValueError("No flows found in file.")
    ValueError: No flows found in file.
    This is my Dockerfile:
    FROM python:3.8-slim
    
    WORKDIR /app
    
    COPY ./requirements.txt /app/requirements.txt
    RUN pip install -r requirements.txt
    
    ENV PYTHONPATH /app/
    
    COPY ./ /app/
    Which is in the same hierarchy as the file I posted at the top. Anyone had this issue? Any help will be appreciated. Thanks!
    Anna Geller

    Anna Geller

    10 months ago
    @Dekel R there are a couple of issues here:1. Do you want to build a new updated version of the Docker image every time you register your flow and let Prefect handle building the image and pushing it to the container registry, or do you prefer to build it once on your own and copy the flow file into it? This is important to know before we can tell whether storing the flow as script makes sense in your use case. 2. Do your custom code dependencies and package requirements change frequently or is it rather stable? This can tell whether Docker storage is useful for you. Additionally to those questions, some things I noticed: a) you use python base image instead of an official Prefect base image - I believe it’s worth switching this to a Prefect base image - those are more lightweight (based on Python slim base image) and optimized to run Prefect flows b) when you use Docker with
    stored_as_script=True
    , you need to add the flow to the storage explicitly, as shown in this example on line 44 https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_no_build/docker_script_docker_run_local_image.py#L44
    In general, I think that if you switch to a Prefect base image and remove those two lines, this should fix the issue for you:
    stored_as_script=True,                                                             path='path_to_the_current_file.py'
    To understand the difference between pickle and script storage in Docker storage, you can compare those two examples:1. Script: https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_no_build/docker_script_docker_run_local_image.py 2. Pickle: https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows/docker_pickle_docker_run_local_image.py
    Dekel R

    Dekel R

    10 months ago
    Thanks for the quick reply. First of all - I changes the docker image to the base Prefect one and deleted the rows you suggested to delete - and its working now - thanks!! And for the questions -1. Yes. I will have a CI CD flow (not a prefect flow (-: ) that will update the docker version after each merge to “main”. 2. Yes, my dependencies may change frequently - and in general - we are planning on using prefect for all of our pipelines (potentially hundreds of those) so using docker will eliminate different compatibility issues and thats why I chose it. Do you think other strategy will fit better?
    Anna Geller

    Anna Geller

    10 months ago
    @Dekel R that’s great to hear! The reason I asked those questions was to determine what is changing more frequently:1. Only your flow files 2. Both, your flow definitions, as well as custom modules and third party libraries. If #1 was true, it could be easier to switch to some other storage class such as one of Git storage classes (Github, Bitbucket, Gitlab, Codecommit, Git), or one of cloud storage classes (e.g. S3, GCS, Azure). But if #2 is true (and you confirmed it is true in your use case), then Docker storage with the pickled flow option indeed makes sense because all those custom modules that are changing frequently are baked into the same image as your flow file and the image can be built each time at registration.
    Dekel R

    Dekel R

    10 months ago
    I see, Great, thank you!! Now I made some changes to my code in order to make it simple. This is the current script -
    with Flow("data_to_parquet_flow", storage=Docker(registry_url="us-central1-XXX/project/directory/",
                                                             dockerfile="./Dockerfile")) as flow:
        start_date = Parameter('start_date', default='10-11-21')
        end_date = Parameter('end_date', default='10-11-21')
        json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date)
        parse_jsons_to_parquet(files_prefix=json_bucket_prefix)
    
    client = prefect.Client()
    client.register(flow, project_name=PREFECT_PROJECT_NAME)
    I also use Prefect parameters now. I got 2 additional questions:1. When running my current code locally it works just fine and finish the flow - but when running from Prefect cloud (run, add some parameters and start the job) I get the following error
    Failed to load and execute Flow's environment: AttributeError("'NoneType' object has no attribute 'setup'")
    Are you familiar with that issue? I cant seem to find any answer online. 2. I want to schedule this job on a daily basis and change the Parameters based on the time of each run (I’m basically trying to get data of the current day from an api at the end of this day) - so today start_date will be 16-11-21 and tomorrow 17-11-21. Whats the best way to do that with Prefect? Also - what happens when prefect miss a run (for any reason)? The behavior I’m expecting is a rerun with the exact same parameters (so if I missed 1 week Prefect will spawn 7 flows, each one with a different day as the date). Thanks again.
    Anna Geller

    Anna Geller

    10 months ago
    @Dekel R 1. Looks like something in your flow definition is not serializable with cloudpickle - since you currently use Docker storage with pickled flow inside, you need to make sure that all objects are serializable and that all custom modules are within the image. Do you have some task that returns a database connection or an HTTP client? 2. You shouldn’t use Parameter defaults for this. Parameter defaults are frozen at registration time, which would have the effect that you would load every day the data from 10-11 🙂 what you can do instead is having a separate task that returns a current date e.g. pendulum.today(), and then using Parameter to optionally override that default value. So this would be: if parameter_value is None: get the return value of that task that returns today (or yesterday’s date in your case).
    and the parameter default would then be None or an empty string
    Regarding:
    if I missed 1 week Prefect will spawn 7 flows, each one with a different day as the date
    you could have a look at the Scheduler docs: The scheduler will never create runs that were scheduled to start in the past. This means that you would then need to explicitly trigger 7 runs, each with a different parameter value for a start date. You can trigger those runs either manually from a UI or from Python code e.g. using the create_flow_run task
    Dekel R

    Dekel R

    10 months ago
    Hey,1. My first task returns a string, the second one is just writing some files to a Google cloud storage. I do use an http client in the task and another Google cloud storage client - but I’m not returning them. 2. I’ll change the default parameters 3. Ok, is this feature on Prefect’s road map? Rerunning missed tasks with their original parameters.
    Anna Geller

    Anna Geller

    10 months ago
    1. Is this HTTP client defined globally and shared between tasks? This error looked like some serialization issue but hard to tell without seeing it. Can you share your flow definition or build a minimal example I could reproduce? 2. Nice 3. Do you mean a feature to backfill data based on past schedule intervals? If so, then no because this is not how the Scheduler Service works. But Prefect provides you with building blocks to backfill data as you wish. Taking the use case you provided:
    from prefect import task, Flow
    from prefect.tasks.prefect import create_flow_run
    
    project_name = "your_project_name"
    
    with Flow("backfill-last-week") as flow:
        create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-11-2021"))
        create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-12-2021"))
        create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-13-2021"))
        create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-14-2021"))
        create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-15-2021"))
        create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-16-2021"))
        create_flow_run(flow_name="your_flow", project_name=project_name, parameters=dict(start_date="11-17-2021"))
    Dekel R

    Dekel R

    10 months ago
    1. The http client and the google cloud storage client are not defined globally, each one of the tasks defines a client for itself. In the first task I generate a new http client and I get data from an API with it, then I use a google storage client to save the data - and I only return a string. The second task is actually opening a new google storage client, reads the data and writes it in parquet format. The only thing that gets passed between tasks is a string - “json_bucket_prefix”. I can only think about one thing that both of the tasks are using - and its my logger - do you think it might be the issue? I define a logger at the upper pat of this file and both of the flows are using it -
    import logging
    from logger import init_logger
    from prefect import Flow, Parameter
    from prefect.storage import Docker
    import os
    import prefect
    
    from config import LOGGER_NAME, PREFECT_PROJECT_NAME
    from tasks.data_fetcher import fetch_data
    from tasks.data_parser import parse_jsons_to_parquet
    
    
    init_logger()
    log = logging.getLogger(LOGGER_NAME)
    
    
    <http://log.info|log.info>('Running the api workflow')
    with Flow("data_to_parquet_flow", storage=Docker(registry_url="us-central1-XXX/project/directory/",
                                                             dockerfile="./Dockerfile")) as flow:
        start_date = Parameter('start_date', default='10-11-21')
        end_date = Parameter('end_date', default='10-11-21')
        json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date)
        parse_jsons_to_parquet(files_prefix=json_bucket_prefix)
    
    client = prefect.Client()
    client.register(flow, project_name=PREFECT_PROJECT_NAME)
    The flow itself is a little complicated so its hard for me to share it, I hope the explanation I just gave is good enough. Scheduling wise - understood, we will probably implement this somehow in order to rerun more easily. Thanks
    Anna Geller

    Anna Geller

    10 months ago
    @Dekel R I think you’re right, the logger looks like it may be the culprit here. In general, you should move all logs into the tasks, e.g.
    <http://log.info|log.info>('Running the api workflow')
    could be defined in the first task, if you need this log line. Basically, everything that is not inside a task will be interpreted and pickled at registration time. And regarding logger, to see your logs in Prefect UI, you should leverage the Prefect logger, you can get it from the context:
    import prefect 
    
    @task
    def your_task():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Log msg from a task")
    Dekel R

    Dekel R

    10 months ago
    Do I need to use a different logger for each task too? Or a single logger will work as long as the logs are only inside the tasks?
    Anna Geller

    Anna Geller

    10 months ago
    if you add this line to each of your tasks that require logging, this would reuse the same logger that prefect is using for the flow:
    logger = prefect.context.get("logger")
    It doesn’t create a new logger object, it only grabs the already existing Prefect logger from the context.
    Dekel R

    Dekel R

    10 months ago
    Right now I’m running this code that initializing the logger -
    init_logger()
    log = logging.getLogger(LOGGER_NAME)
    In the rest of my files (inside the tasks) I’m only running this -
    log = logging.getLogger(LOGGER_NAME)
    So do you think I need to now run “init_logger” in each one of my tasks (for different loggers) and then use that separate logger for each task? Please see the code of “init_logger” (nothing special)
    import logging
    from config import LOGGER_NAME
    
    
    def init_logger():
        logger = logging.getLogger(LOGGER_NAME)
        logger.setLevel(<http://logging.INFO|logging.INFO>)
        logger.addHandler(logging.StreamHandler())
        <http://logger.info|logger.info>(f'initialized logger {LOGGER_NAME}')
    I’ll try that (logger = prefect.context.get(“logger”))
    Anna Geller

    Anna Geller

    10 months ago
    yeah, so I think you could just use this logger from the context and not use the init_logger at all. But if you need to configure the Prefect logger in some specific way, you could do that through environment variables or through config.toml, e.g.
    export PREFECT__LOGGING__LEVEL="DEBUG"
    export PREFECT__LOGGING__FORMAT="%(levelname)s - %(name)s | %(message)s"
    Dekel R

    Dekel R

    10 months ago
    I did it but I keep getting the same error. I also have some “common” configurations that both of the tasks use (just a config file with urls and such stuff) is it an issue too? Does the tasks have to be completely isolated?
    Anna Geller

    Anna Geller

    10 months ago
    @Dekel R btw, many people use KV Store for backfilling purposes, e.g. you can store last loaded date as a key value pair and any new flow run grabs this value and loads data from there. Here is one example https://docs.prefect.io/orchestration/concepts/kv_store.html#using-key-value-pairs-in-flows
    Dekel R

    Dekel R

    10 months ago
    Nice, seems like a good solution
    Do you have any idea of whats the best way to proceed with this issue? I keep getting this error.
    Anna Geller

    Anna Geller

    10 months ago
    So the logger didn’t fix it? Can you share your most recent flow definition?
    Dekel R

    Dekel R

    10 months ago
    Nope.. still the same error (runs great locally but not on Prefect cloud) This is the latest flow definition -
    from prefect import Flow, Parameter
    from prefect.storage import Docker
    import prefect
    import os
    from config import PREFECT_PROJECT_NAME
    from tasks.data_fetcher import fetch_data
    from tasks.data_parser import parse_jsons_to_parquet
    
    with Flow("weather_data_to_parquet_flow", storage=Docker(registry_url="us-central1-XX/project_name4/data-pipelines/",
                                                             dockerfile="./Dockerfile")) as flow:
        start_date = Parameter('start_date')
        end_date = Parameter('end_date')
        json_bucket_prefix = fetch_data(start_date=start_date, end_date=end_date)
        parse_jsons_to_parquet(files_prefix=json_bucket_prefix)
    
    # flow.run(parameters=dict(start_date=os.environ.get('START_DATE'),
    #                          end_date=os.environ.get('END_DATE')))  # parameters=dict(start_date=os.environ.get('START_DATE'), end_date=os.environ.get('END_DATE'))
    
    
    client = prefect.Client()
    client.register(flow, project_name=PREFECT_PROJECT_NAME)
    Anna Geller

    Anna Geller

    10 months ago
    Using env variables here may cause trouble too:
    os.environ.get('START_DATE'),
    you would need to make sure that those env variables exist both during registration and in the environment from which you register your flows, as well as from the execution environment. I think the best way to test it is to use a super simple basic flow. E.g.
    from prefect import task, Flow
    
    
    @task(log_stdout=True)
    def hello_world():
        print("hello world")
    
    
    with Flow("weather_data_to_parquet_flow", storage=Docker(registry_url="us-central1-XX/project_name4/data-pipelines/",
                                                             dockerfile="./Dockerfile") as flow:
        hw = hello_world()
    If this works, then you know that it’s something in the flow definition that doesn’t work. Btw, did you know that prefect has CLI to register flows? It’s much easier than using a client.