https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Martin Durkac

    10/18/2021, 7:25 AM
    Hi guys. I was not able to find any solution for my current problem. Is it possible to delete flow run history from specific day like yesterday or in highly repeated flows delete history older than 5 hours? I ran into problem with my memory on EC2, because my flow generated 11000 flow runs and EC2 instance run out of memory. I need to make some script which removes older runs based on flow id or name. Thanks for your help and ideas. 🙂
    a
    a
    +1
    8 replies · 4 participants
  • m

    Mourad Hamou-Mamar

    10/18/2021, 8:27 AM
    Hello everyone, I'm having difficulty trying to use GitStorage on my prefect instance. I have setup a prefect server and a flow that is being pulled via GitLab. Everything is working fine on that side. My flow and all of the directories where my flow is stored are pulled. But some of my script can't find my modules. I have imported the root directory to sys.path in my flow.py using :
    parent_path = Path(__file__).resolve().parent
    sys.path.append(os.path.relpath(parent_path))
    My structure looks like this :
    .
    ├── flow.py
    ├── functions
    │   ├── function_mapping
    │   │   ├── function_a.py
    │   │   └── function_b.py
    │   └── function_transform
    │       ├── function_c.py
    │       └── function_d.py
    └── task
        ├── mapping
        │   └── task_a.py
        └── transform
            ├── task_b.py
            └── task_c.py
    In my flow.py, I import my task doing :
    # in flow.py
    from task.mapping.task_a import task_a
    And it works for the
    task
    module. But in my task when I try to use my functions, it doesnt find them :
    # in task_a.py
    from functions.function_mapping.function_a import function_a
    I always get the error message :
    ModuleNotFoundError: No module named 'functions'
    I don't get why it wouldnt find my
    functions
    module since I make the import from the root directory and this last one is added to sys.path. If anyone have any idea about how I could make it work or what should I try to debug this situation, it would be greatly appreciated. Thanks everyone in advance.
    a
    k
    15 replies · 3 participants
  • m

    Marko Herkaliuk

    10/18/2021, 8:43 AM
    Hello everybody. Prefect showed a very strange behavior today, which was not before. In many Flow I have type options as
    start_date = Parameter("start_date", default=pendulum.now("Europe/Kiev").add(days=-1).to_date_string())
    start_date = Parameter("start_date", default=(date.today() - timedelta(1)).strftime("%Y-%m-%d"))
    and usually everything worked as needed, ie during the execution of the script the value of the parameter was calculated. However, I noticed that on Saturday, Sunday and Monday (today) all similar parameters were passed as 
    2021-10-14
    , ie as if Flow was launched on Friday. Last weekend this was not the case (as in general before)
    a
    k
    +1
    24 replies · 4 participants
  • s

    Sergey Shamsuyarov

    10/18/2021, 9:05 AM
    Hi, maybe someone knows, I am using img prefecthq / apollo: core-0.15.6 to run my local server. How is it possible to disable the GraphQL Playground when navigating to http: // localhost: 4200 / graphql, typical NODE_ENV = production didn't work
    a
    11 replies · 2 participants
  • t

    Thomas Furmston

    10/18/2021, 9:49 AM
    Hi, I am trying to set up a flow of flows with parameters being passed from the parent flow to the child flow, but am having some issues. With hard-coded values for the parameters it works, so it seems to be related to how I am trying to set up the parameter passing.
    a
    k
    82 replies · 3 participants
  • t

    Thomas Furmston

    10/18/2021, 9:49 AM
    import logging
    import pendulum
    import prefect
    from prefect import Flow, Parameter, task
    from prefect.storage import Docker
    from prefect.schedules import CronSchedule
    from prefect.tasks.prefect import StartFlowRun
    
    
    logger = logging.getLogger(__name__)
    
    weekday_schedule = CronSchedule(
        '41 10 * * 1-5',
        start_date=pendulum.now(tz='Europe/London')
    )
    
    
    @task
    def calculate_flow_end_date(end_date: str):
        if end_date is not None:
            return end_date
        return prefect.context.get('scheduled_start_time').to_date_string()
    
    
    common_flow = StartFlowRun(
        flow_name='already_existing_flow1',
        project_name='my_project_name',
        wait=True,
    )
    baseline_flow = StartFlowRun(
        flow_name='already_existing_flow2',
        project_name='my_project_name',
        wait=True,
    )
    
    with Flow("my_scheduled_flow",
              schedule=weekday_schedule,
              storage=Docker(
                  base_image='my-docker-image:latest',
                  local_image=True,
              )) as flow:
    
        num_days_parameter = Parameter('num_days', default=1)
        num_back_fill_days_parameter = Parameter('num_back_fill_days', default=1)
        end_date_parameter = Parameter('end_date', default=None)
    
        task_end_date = calculate_flow_end_date(end_date_parameter)
        <http://logger.info|logger.info>('Task End Date: %s', task_end_date)
        <http://logger.info|logger.info>('Num Days: %s', num_days_parameter)
        <http://logger.info|logger.info>('Num Days Backfill: %s', num_back_fill_days_parameter)
    
        common_flow_result = common_flow(parameters={
            'num_days': num_days_parameter,
            'num_back_fill_days': num_back_fill_days_parameter,
            'end_date': task_end_date,
        })
        baseline_flow_result = baseline_flow(
            upstream_tasks=[common_flow_result],
            parameters={
                'num_days': num_days_parameter,
                'num_back_fill_days': num_back_fill_days_parameter,
                'end_date': task_end_date,
            }
        )
  • t

    Thomas Furmston

    10/18/2021, 9:50 AM
    ☝️ This is the parent flow. And the child flow is below.
  • t

    Thomas Furmston

    10/18/2021, 9:50 AM
    with Flow(
        'common_flow',
        storage=Docker(
            base_image='my-docker-image:latest',
            local_image=True,
        )) as flow:
    
        num_days_parameter = Parameter('num_days', required=True)
        num_back_fill_days_parameter = Parameter('num_back_fill_days', required=True)
        end_date_parameter = Parameter('end_date', required=True)
    
        served_ads_command = ShellTask(
            name='my_first_task',
            command=construct_etl_command(
                app_mode=settings.app_mode,
                table_name='task1',
                num_days_cli_arg='num_back_fill_days',
                num_days=num_back_fill_days_parameter,
                end_date=end_date_parameter,
                database=settings.database,
            ),
            stream_output=True,
        )
  • t

    Thomas Furmston

    10/18/2021, 9:51 AM
    The parent flow is calling the child flow as expected, but then the child flow is dying with the error.
  • t

    Thomas Furmston

    10/18/2021, 9:51 AM
    [2021-10-18 09:30:15+0000] INFO - prefect.served_advert_task | /tmp/prefect-ufbr5w5b: line 1: Parameter:: No such file or directory [2021-10-18 09:30:15+0000] ERROR - prefect.served_advert_task | Command failed with exit code 1
  • t

    Thomas Furmston

    10/18/2021, 9:52 AM
    Is there something I am doing wrong?
  • e

    Eric Feldman

    10/18/2021, 11:23 AM
    Hi 🙂 can I run a Flow on an agent without registering it before?
    a
    k
    20 replies · 3 participants
  • b

    Barbara Abi Khoriati

    10/18/2021, 1:14 PM
    Hi! Is there any way you can set an ID or name to a schedule?
    k
    7 replies · 2 participants
  • e

    Eddie

    10/18/2021, 2:21 PM
    Is there already an idiomatic way to make
    wait_for_flow_run
    tasks for fail if the child flow run was a failure? I know that
    wait_for_flow_run
    returns a
    FlowRunView
    so I assume it is possible to define another task that raises an exception if the run view state
    is_failed()
    but I am curious if there is already an interface for this behavior in the built-in tasks.
    k
    m
    7 replies · 3 participants
  • c

    Constantino Schillebeeckx

    10/18/2021, 6:08 PM
    Is
    prefect.context.get("scheduled_start_time")
    timezone aware? if not, is there an assumed timezone of UTC? If I schedule my flow with something like
    CronSchedule(cron, start_date=pendulum.datetime(2021, 1, 1, tz=tz))
    will the "scheduled_start_time" have the same timezone as the cron schedule?
    k
    3 replies · 2 participants
  • r

    Rowan Gaffney

    10/18/2021, 6:21 PM
    Are there any examples of running a docker agent, in a docker container with flows being run with DockerRun from script storage? I think I am having issues with the Docker-in-Docker paradigm
    k
    11 replies · 2 participants
  • a

    Anatoly Alekseev

    10/18/2021, 6:23 PM
    Hello, several times already I met a problem when a task with timeout never times out, actually. Python process that is orchestrated by Prefect is terminated sometimes by the GPU (something like OOM, not important now). I use @task(state_handlers=failure_notifiers, nout=2, timeout=60 * 60 * 4) decorator to set a timeout of 4 hours. In the Cloud dashboard, there is no mention of the timeout that I ordered for that task. Is the absence of timeout info in the dashboard expected, or my timeout is not being set, somehow?
    k
    m
    9 replies · 3 participants
  • b

    Ben Muller

    10/19/2021, 12:44 AM
    Hey team, quick one about mapping. if I have some code like so:
    with Flow(
        name="horse_racing_data",
    ) as flow:
        dates = get_dates_task(days_back=days_back, days_ahead=days_ahead, dt_format="%d-%b-%Y")
    
        raw_sectional_data = apply_map(get_puntingform_sectional_dump_task, date=dates)
    
    
        spell_stats_data = apply_map(
            query_db_for_df_task, path_to_sql=unmapped("sql/select_spell_count.sql")
        )
        
        enriched_pf_data = apply_map(
            calculate_runners_spell_stats_task, pf_sectional_df=raw_sectional_data, spell_data=spell_stats_data
        )
    I am making multiple separate
    apply_map
    calls and I just wanted to make sure if I can guarantee that when calling
    calculate_runners_spell_stats_task
    I can guarantee the order of the returned maps? What i mean is that
    raw_sectional_data
    and
    spell_stats_data
    are iterables and as they are provided to the function it is important that they maintain the same order. Am I all good here?
    k
    15 replies · 2 participants
  • g

    Gabi Pi

    10/19/2021, 6:40 AM
    Hey everyone, Sorry about the noob question - I see from the documentation that a Kubernetes flow can be configured to run in one container only (the docker image for the entire flow). Is it possible to set up a flow in which each task runs within its own container?
    k
    a
    6 replies · 3 participants
  • e

    Eric Feldman

    10/19/2021, 8:46 AM
    Hello 🙂 I see in
    prefect.core.flow.Flow.serialized_hash
    documentation that if the hash of the flow didn’t changed, that it won’t be uploaded to the server when calling
    register
    But I have the same hash all over again and the server keeps having new versions of the flow 🤔
    a
    a
    20 replies · 3 participants
  • s

    Stefano Cascavilla

    10/19/2021, 9:24 AM
    Hi everyone 😄 We've recently upgraded to the 0.15.6 Prefect version (both server and flows), but if we use start a flow by another flow by using the
    StartFlowRun.run()
    this error is shown:
    Error during execution of task: ClientError([{'message': '[{\'extensions\': {\'path\': \'$.selectionSet.insert_task_run_artifact.args.objects\', \'code\': \'constraint-violation\'}, \'message\': \'Not-NULL violation. null value in column "tenant_id" violates not-null constraint\'}]', 'locations': [{'line': 2, 'column': 5}], 'path': ['create_task_run_artifact'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': '[{\'extensions\': {\'path\': \'$.selectionSet.insert_task_run_artifact.args.objects\', \'code\': \'constraint-violation\'}, \'message\': \'Not-NULL violation. null value in column "tenant_id" violates not-null constraint\'}]'}}}])
    We are migrating from
    0.14.22
    version, we are using
    server
    as the backend in local Can anybody help?
    a
    a
    +1
    18 replies · 4 participants
  • l

    Lukáš Polák

    10/19/2021, 9:35 AM
    Hi everyone 🙂 we're playing with Adjustments in Schedules (https://docs.prefect.io/api/latest/schedules/adjustments.html#functions). Our goal is simple - implement a simple jitter for flow execution (we would like to spread the flow start times somewhat randomly in time in order to make the work distribution more even). The documentation does not explicitly state that you cannot create your own functions. Yet, you list them out here - https://github.com/PrefectHQ/prefect/blob/8807f335fa1385275d9dd28faa857e5fd22536b7/src/prefect/serialization/schedule.py#L28. Is there a way to easily "plug in" a custom function? Honestly, we're more than happy to contribute this function to the codebase, if the community finds it useful - so that's also an option for us. Any help or feedback is appreciated 🙂 thank you 🙂
    a
    k
    11 replies · 3 participants
  • m

    Mark Fickett

    10/19/2021, 1:39 PM
    Does Prefect have plugin support? Hi all! I'm working on setting up some new pipelines and figuring out what tools to use. One pattern we have now is that the software team maintains the core ETL, and the data science contributes a few final stages to the pipeline to compute new metrics. The DS team contributions tend to change more rapidly, may not be as robust, and the DS team may want to re-run stages with new algorithms more frequently than the base pipeline run. So it would be nice to have separate versioning/releasing of pipeline stages from the different teams. Is there a way with Prefect to pull some portions of the pipeline from a separate repo or anything like that? (We could also manage this outside of the orchestration framework, like have CD combine portions of the pipeline from different sources, or have two dependent pipelines with one triggering the next.)
    k
    m
    5 replies · 3 participants
  • h

    haf

    10/19/2021, 2:56 PM
    Anyone using editable / source dependencies in your flows?
    k
    70 replies · 2 participants
  • t

    Thomas Furmston

    10/19/2021, 3:01 PM
    Hi Just a preliminary question at the moment, but is there a way to ensure the flow registered to a flow are in sync with the flows defined in my code base?
  • t

    Thomas Furmston

    10/19/2021, 3:02 PM
    From what I can see, it seems that registration adds/updates flows. I suppose I am looking more for a sync type process, so that old ones are also deleted.
    k
    18 replies · 2 participants
  • k

    Kevin

    10/19/2021, 8:09 PM
    Hi - is anyone using Azure AKS Agent and Jobs with Prefect Cloud? I'm having a really difficult time understanding how everything is tied together. Right now, I am trying to run a flow that is stored within Azure storage and runs as a kubernetes job (presumably within the same cluster my agent is in). But I am running into the following Exception('Azure connection string not provided. Set
    AZURE_STORAGE_CONNECTION_STRING
    environment variable or save connection string as Prefect secret.')
    k
    a
    35 replies · 3 participants
  • j

    John Jacoby

    10/19/2021, 8:33 PM
    Hi all! I'm pretty new to distributed computing and have never used Dask so please excuse me if this is a simple question. Is there a way to run Prefect (Prefect 1, not the new Orion) on a SLURM cluster where each task is submitted as a separate job? Is this something that could be achieved via the Dask executor or is this different functionality? I'm pretty comfortable with Python and OOP so if there's a way to create a custom executor that could do this, I would appreciate guidance on that as well. Thank you!
    k
    m
    18 replies · 3 participants
  • m

    Matt Alhonte

    10/19/2021, 9:54 PM
    Heya! So, we're eyeing how to transition from an Environment to Executors and Run_Configs. One blocker is that for some Flows, we change the cluster based on certain parameters. Have some ideas for how to implement that - any of these seem like they'd work well? 1. Flow-within-a-Flow that's registered with a different 
    run_config
     based on params, and then kicked off from the Outer Flow 2. Spinning up a Dask Cluster from within the Flow and submitting tasks to it (and the Flow itself just runs in a small container) https://docs.prefect.io/core/idioms/resource-manager.html https://docs.prefect.io/api/latest/tasks/resources.html 3. Maybe start experimenting with Adaptive Scaling for Dask Clusters?
    m
    k
    18 replies · 3 participants
  • h

    haf

    10/20/2021, 8:06 AM
    Is there a good way make "quick run" do a quick run in Prefect Cloud? Seems like it only schedules.
    n
    a
    173 replies · 3 participants
Powered by Linen
Title
h

haf

10/20/2021, 8:06 AM
Is there a good way make "quick run" do a quick run in Prefect Cloud? Seems like it only schedules.
n

Noah Holm

10/20/2021, 8:08 AM
It is scheduled for running immediately with quick run so a matching agent should pick it up the next time it polls Cloud (within 10 seconds)
h

haf

10/20/2021, 8:08 AM
It doesn't
It just says scheduled
n

Noah Holm

10/20/2021, 8:09 AM
I would assume it’s your label/agent config that’s incorrect then, otherwise a bug in cloud because what you want is what should happen
h

haf

10/20/2021, 8:10 AM
https://cloud.prefect.io/logary/flow-run/a357c240-c909-4657-aa71-1846c27b1912
Not doing anything in particuar...
n

Noah Holm

10/20/2021, 8:12 AM
Quick run works well for me in cloud. Do you have agents with labels matching your flow?
:upvote: 1
👀 1
h

haf

10/20/2021, 8:12 AM
Probably not
Trying again
n

Noah Holm

10/20/2021, 8:13 AM
Here’s the docs for that
h

haf

10/20/2021, 8:15 AM
It keeps registering my laptop name as a label?
n

Noah Holm

10/20/2021, 8:17 AM
Did you give any args.labels? I think that might be the default if you don’t set any labels
h

haf

10/20/2021, 8:17 AM
parser = ArgumentParser(add_help=False)
    parser.add_argument(
        "--debug",
        default=False,
        required=False,
        action="store_true",
        dest="debug",
        help="debug flag",
    )

    subparser = parser.add_subparsers(dest="command")
    register = subparser.add_parser("register")
    run = subparser.add_parser("run")

    register.add_argument("-c", "--commit-ref", dest="commit_ref", type=str, required=True)
    register.add_argument("-p", "--project-name", dest="project_name", type=str, default="dbt")
    register.add_argument("-l", "--labels", action="append", default=[])
    register.add_argument("--build", dest="build", action="store_true", default=False)

    run.add_argument(
        "--run-on-schedule", dest="run_on_schedule", action="store_true", default=False
    )
    run.add_argument(
        "--basepath", dest="basepath", type=str, default=path.dirname(path.realpath(__file__))
    )

    args = parser.parse_args()
Defaults to empty
Calling with
--labels prod
n

Noah Holm

10/20/2021, 8:18 AM
What are the labels in cloud? Is it set to [prod, laptop-name]?
h

haf

10/20/2021, 8:19 AM
Yes
n

Noah Holm

10/20/2021, 8:20 AM
Could be that it always defaults to that when doing local stuff, haven’t tried local agent/cloud enough. I think the easiest would be to add laptop name in your local agent since I assume you only have prod right now
(you have docker agent not the actual prefect local agent but it’s still local on your machine)
h

haf

10/20/2021, 8:34 AM
I can't add my laptop name in CI/CD and in prod because my laptop isn't there, and others have to be able to deploy
n

Noah Holm

10/20/2021, 8:35 AM
But you’re using local storage? This has to run on your machine?
h

haf

10/20/2021, 8:35 AM
No
It's running inside GKE in a container
n

Noah Holm

10/20/2021, 8:36 AM
Your code ☝️ uses local storage https://docs.prefect.io/orchestration/flow_config/storage.html#local
And if using GKE you need a KubernetesAgent
h

haf

10/20/2021, 8:38 AM
I know it does
https://prefect-community.slack.com/archives/CL09KU1K7/p1634655409057200
I was just recommended using it, because I'm building a docker container with it
So I want prefect to just load data from there
n

Noah Holm

10/20/2021, 8:48 AM
Huh, ok this is some advanced stuff. 😮 Well ok, so you’re getting your laptop hostname when registering from there because of the local storage. You should be able to omit that with
add_default_labels=False
kwarg at least https://docs.prefect.io/api/latest/storage.html#local
:upvote: 1
I don’t understand how the Docker run config works on an agent on GKE, would love to hear if you accomplish what you want
a

Anna Geller

10/20/2021, 8:59 AM
@Noah Holm you are 100% correct that hostname is attached as a label, when local storage and local agents are used. This default label is extremely useful, because this flow is then stored on a specific machine and thus it can only run successfully there. And you are also right that when a FlowRun gets stuck in a Scheduled state, it’s most likely due to: • agent misconfiguration, i.e. labels not matching between agent and flow. • agent is not healthy • or the API key is expired @haf did you manage to solve the issue?
h

haf

10/20/2021, 9:27 AM
I'm not sure how this is complex; isn't the default that people run their stuff in the cloud on k8s? 😉
🤦‍♂️ 1
And using containers?
Tried without the auto-labels now
ok, so DockerRun doesn't work with k8s
Love the name
vegan-bear
😂 1
Nope, no cheese
Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage')
a

Anna Geller

10/20/2021, 9:36 AM
@haf many users run Prefect flows on K8s. This error is much more informative! It looks like run config is working, but storage doesn’t. Can you share the flow code that led to this error?
h

haf

10/20/2021, 9:37 AM
if args.debug:
        prefect.config.logging.level = "DEBUG"

    if args.command == "run":
        prefect.context["basepath"] = args.basepath
        print(f"prefect.context.get(basepath)='{prefect.context.get('basepath')}")
        flow.run(run_on_schedule=args.run_on_schedule)

    elif args.command == "register":
        image = f"europe-docker.pkg.dev/logary-delivery/cd/data-pipelines:{args.commit_ref}"
        print(f"Registering flow with labels={args.labels} image={image}")

        flow.schedule = IntervalSchedule(start_date=at_night(), interval=timedelta(hours=24))
        flow.storage = Local(
            path="/app/flows/run_mmm.py",
            stored_as_script=True,
            add_default_labels=False,
        )
        flow.run_config = KubernetesRun(
            image=image,
            labels=args.labels,
        )
        flow.register(
            project_name=args.project_name,
            build=args.build,
            idempotency_key=args.commit_ref,
            labels=args.labels,
            add_default_labels=False,
        )
a

Anna Geller

10/20/2021, 9:41 AM
I see, so if you are using KubernetesRun, you should use a non-local storage, because your Kubernetes pod can’t access your local resources, since they reside outside of the pod. You could try one of the following: • one of Git storage classes: Git, GitHub, GitLab, Bitbucket - depending on what you use as your VCS - as long as you have a Personal Access Token stored as Secret, your Kubernetes job running your flow will be able to grab flow code from there • one of Cloud storage classes: S3, GCS, Azure - this would also require authenticating to the specific cloud provider to be able to grab the flow code from there
you mentioned you use GKE, so GCS bucket would be potentially a good storage mechanism for you: https://docs.prefect.io/orchestration/flow_config/storage.html#google-cloud-storage
h

haf

10/20/2021, 9:45 AM
I want to access local things in the image
I don't want to separate the code from the binary artefact that is scheduled
a

Anna Geller

10/20/2021, 9:48 AM
Then it would make sense to use Local storage and Local agent - this way you have access to all local resources for development. And once you are ready to deploy it so that it runs on schedule, you can package your dependencies into a container image so that the container can be spun up anywhere - any Kubernetes cluster on any cloud. Basically, in order for your Docker image to run reliably on GKE, everything that your code needs must be within the Docker image, so no dependency on local resources. Does it make sense?
h

haf

10/20/2021, 9:49 AM
Yes it makes sense and this is exactly the way I've done it
Except "local agent"
:upvote: 1
I can't schedule things on Prefect cloud with "local agent"; it'll complain
a

Anna Geller

10/20/2021, 9:50 AM
you absolutely can schedule things with Local Agent! 🙂 I’m sure we can sort it out
h

haf

10/20/2021, 9:51 AM
But the problem isn't the agent, the problem is that the runtime can't find the flow?
a

Anna Geller

10/20/2021, 9:53 AM
the easiest thing to do is to start the local agent from the terminal in the default configuration:
prefect agent local start
Then, you don’t even need to pass any storage or agent, because Local storage and agent are the defaults:
# hw_flow.py
from prefect import task, Flow


@task(log_stdout=True)
def hello_world():
    print("hello world")


with Flow("idempotent-flow") as flow:
    hw = hello_world()
Then, you can use the CLI to register your flow to the Prefect Cloud:
prefect register --project YOUR_PROJECT_NAME -p hw_flow.py
just following the defaults, you should be able to see this flow in your Cloud account, as long as you authenticated your local machine with the API key:
prefect auth login --key "YOUR_KEY"
h

haf

10/20/2021, 9:55 AM
Yes I've tried local agent before and it worked the way you mention
But I don't want to use the local agent, I want to use the k8s agent?
a

Anna Geller

10/20/2021, 9:56 AM
this is the deployment stage. I thought we are still in the development phase 🙂
h

haf

10/20/2021, 9:56 AM
No, I can run it locally just fine
Except that I can't "schedule it"
Do you mean I should debug with local agent and not k8s agent?
a

Anna Geller

10/20/2021, 9:57 AM
ok, if you are ready for deployment, you need to have a look at your dependencies: do you use some 3rd party libraries or custom modules that your flow needs?
h

haf

10/20/2021, 9:57 AM
The feedback loop is actually pretty small
yes I do
A lot of them
so I use pipfile and have set up a docker container for it
a

Anna Geller

10/20/2021, 9:57 AM
Then those need to be installed in the Dockerfile
h

haf

10/20/2021, 9:57 AM
and that has all the flows in it, I've verified that.
$ docker run --rm -it europe-docker.pkg.dev/logary-delivery/cd/data-pipelines:xxx
            _____  _____  ______ ______ ______ _____ _______
           |  __ \|  __ \|  ____|  ____|  ____/ ____|__   __|
           | |__) | |__) | |__  | |__  | |__ | |       | |
           |  ___/|  _  /|  __| |  __| |  __|| |       | |
           | |    | | \ \| |____| |    | |___| |____   | |
           |_|    |_|  \_\______|_|    |______\_____|  |_|

Thanks for using Prefect!!!

This is the official docker image for Prefect Core, intended for executing
Prefect Flows. For more information, please see the docs:
<https://docs.prefect.io/core/getting_started/installation.html#docker>

root@514926ea8f6a:/app# ls
Pipfile  Pipfile.lock  dbt  dbt_project.yml  flows  infer  packages.yml  postinstall.py  profiles.yml
root@514926ea8f6a:/app# ls flows
__pycache__	   exchange_rates.py		      run_mmm.py
dask-worker-space  exchange_rates__insert_rate.sql    run_mmm__metrics_eligible_channels.sql
dbt.py		   exchange_rates__missing_dates.sql  run_mmm__revenues_eligible_apps.sql
root@514926ea8f6a:/app# cd flows
root@514926ea8f6a:/app/flows# l
bash: l: command not found
root@514926ea8f6a:/app/flows# pwd
/app/flows
root@514926ea8f6a:/app/flows# exit
logout
a

Anna Geller

10/20/2021, 9:58 AM
The thing is: you don’t need a virtual env in the Docker image - this is basically a replacement for it. So I would honestly get rid of pipfile and specify everything you need in plain and simple Dockerfile
h

haf

10/20/2021, 9:59 AM
Well...
I still need to do a pip install
a

Anna Geller

10/20/2021, 9:59 AM
can you share your Dockerfile?
h

haf

10/20/2021, 9:59 AM
and I still need to freeze my versions of the deps
FROM prefecthq/prefect:0.15.4-python3.8

RUN pip install --upgrade pip setuptools wheel twine \
    && pip install pipenv \
    && apt-get update \
    && apt-get install -y --no-install-recommends curl gcc python3-dev libssl-dev \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY Pipfile* packages.yml profiles.yml .user.yml .python-version dbt_project.yml postinstall.py ./
COPY infer ./infer

RUN PIPENV_VENV_IN_PROJECT=1 pipenv install --deploy
ENV PATH="/app/.venv/bin:$PATH"

RUN python postinstall.py

COPY flows ./flows
COPY dbt ./dbt
That's why I use pipfile
a

Anna Geller

10/20/2021, 9:59 AM
yes, but we can simply do that:
RUN pip install -r requirements.txt
h

haf

10/20/2021, 9:59 AM
I don't have requirements.txt
I have a pipfile because it makes it more consistent
I can generate requirements.txt
but this isn't the problem
I can run things in this virtual env, as shown by running
python postinstall.py
And I'm not crashing on "module does not exist" anymore
a

Anna Geller

10/20/2021, 10:01 AM
awesome. I think what is missing now is only Storage then
h

haf

10/20/2021, 10:01 AM
I have a pipfile because it makes it more consistent
If this really is my issue then I'm happy to discuss the hows and whys of this, but I don't think it is
a

Anna Geller

10/20/2021, 10:01 AM
basically, you need to tell your Flow where to find your flow code
got it. Yeah I think you totally crushed it with the dependency management in your Dockerfile, and this is not an issue. The only thing missing is building a Storage for your flow code - Can you try a GCS bucket? Otherwise, if you want to keep it all in your Docker image, you can use Docker storage and ensure that you copy your flow code to the image:
COPY /path/to/your/flow.py .
h

haf

10/20/2021, 10:06 AM
I don't want to use GCS because I want to keep the deployable artefact self-contained.
I don't want one run to fail one day but work the other or vice versa.
a

Anna Geller

10/20/2021, 10:06 AM
I totally get it, I think Docker storage is probably the easiest since you already are building a Docker image - I have some docs for you to get started, I think you know enough to manage this, but LMK if you face any issues along the way: • https://docs.prefect.io/orchestration/recipes/configuring_storage.html • https://docs.prefect.io/api/latest/storage.html#docker
h

haf

10/20/2021, 10:06 AM
Yes, I'm facing issues along the way 🙂
Like you can see, I'm already copying the flows
a

Anna Geller

10/20/2021, 10:08 AM
Can you try copying only the flow .py files to the WORKDIR?
COPY flows .
h

haf

10/20/2021, 10:08 AM
yes sure
Just one thing: you recommend Docker storage, but since I'm doing my own Dockerfile this is not supposed to be required, and also / furthermore, Docker storage will pickle your flows
When you spawn pickled flows, it crashes (because it doesn't pickle source dependencies for a completely unknown other reason), so that's why I'm not using Docker storage. Just to be clear 😉
So now you're thinking it might be a problem that it'll only look in the current dir?
Despite me giving a fully qualified path?
a

Anna Geller

10/20/2021, 10:13 AM
you can pass
stored_as_script=True
in Docker storage. I’m not really recommending Docker storage, I think it would be easier with GCS 🙂 but this was your preference, and Docker storage is easiest to get started because you can pass your Dockerfile and it will be built anytime you register your flow so that you can ensure all dependencies are baked into the image You need to use Docker storage, not local storage - remember the links to the docs I sent you?
h

haf

10/20/2021, 10:20 AM
Yes I remember the docs you sent me.
But to start out I don't want to build my dockerfile every time I register a flow, because that makes it impossible to tag it properly.
Secondly, if you look at the code, I've already packaged the image, so I don't need to rebuild it.
Thirdly, the Docker storage by default tries to itself add files into the image, which is superfluous for me since I've already built the image
This means I'd end up with
flow.storage = Docker(
            path="/app/flows/run_mmm.py",
            image_name=image_base,
            image_tag=args.commit_ref,
            stored_as_script=True,
            add_default_labels=False,
        )
Right?
So while this all registers; I'm still getting
Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage')
@Anna Geller Have you seen https://github.com/kvnkho/demos/blob/main/blogs/prefect-docker/docker_with_local_storage/workflow/flow.py#L20 ?
https://prefect-community.slack.com/archives/CL09KU1K7/p1634710386098900?thread_ts=1634655409.057200&amp;cid=CL09KU1K7
a

Anna Geller

10/20/2021, 10:38 AM
Sure, you can try it. I haven’t done it myself so it’s hard to help - but I will try! Overall, I believe the safest option is to match storage and run configuration in a way: • LocalRun + Local storage • DockerRun + Docker storage And once you move to Kubernetes, you can package your flow dependencies into a Docker image that will likely remain mostly static in contrast to your flow files. You then pass this image to your KubernetesRun. And you can use Cloud storage or Git storage classes as a Storage to ensure that even if something changes in your flow file, you don’t have to rebuild the image - the FlowRunner will grab the latest version of your flow code at runtime. But you can really mix and match it as you wish. There are so many possibilities! 🙂 You can find some of those here.
h

haf

10/20/2021, 10:43 AM
I have tried it
That's why I'm asking for help now
It's not working
I have moved to k8s
I have a dockerfile
and no, the deps aren't static, they also change
I want to package them together with the code.
I want to rebuild the image (called DevSecOps)
I know pretty well what I want.
What I don't know how to make Prefect do, is to pick up the flow file from the docker image.
Sorry if this sounds "angry", but it's just where I'm at right now. I know you're helping me, but "safest way" to use "LocalRun+LocalStorage" doesn't make any sense when I'm telling you explicitly I want to run this on k8s
a

Anna Geller

10/20/2021, 10:45 AM
@haf in that case Docker storage is ideal because the image gets rebuilt any time you register your flow. Do you have by any chance a Flow and configuration I could use to reproduce your issue? like one complete example that you could share so that I can run it on my machine and try to reproduce and identify the issue
h

haf

10/20/2021, 10:45 AM
Yes, that's probably what I should do next.
Let me work on that 🙂
a

Anna Geller

10/20/2021, 10:46 AM
that would be awesome! Thank you! 👍
h

haf

10/20/2021, 10:46 AM
Docker storage is ideal because the image gets rebuilt any time you register your flow
No, it's not ideal, I don't want this
because:
Prefect uses
requirements.txt
but doesn't support editable dependencies
So either I set up and host my own pypi, which I'm not doing right now
or I collapse everything into a single huge file (which I had before, but when you make more than one flow, this is unmaintainable)
OR I publish the package, which I'm not doing
HENCE, if I could make the prefect agent just call a python file, and let python handle everything related to packaging I'd be so happy 🙂
Also because Prefect's docker storage will pickle your classes and this doesn't support editable deps
if you look at the logs from the docker storage you'll see
.prefect
files are being added as part of the build; AFAIK these are the pickled files
a

Anna Geller

10/20/2021, 10:49 AM
I’m sure you can build your package inside of the Docker image. You would need a setup.py and then
RUN pip install .
h

haf

10/20/2021, 10:50 AM
pip install -e .
yes
a

Anna Geller

10/20/2021, 10:50 AM
not sure what would be an editable dependency in a Docker image - once the image is built, no one will edit it any more, what am I missing?
h

haf

10/20/2021, 10:50 AM
But
pipenv
does this as part of
pipenv install --deploy
What you're missing is that when you do
pip install .
you're building a
.egg
file, but when you do an editable install you're not.
Instead, you add the path of the module to PYTHONPATH and that lets you use the code (AFAIK)
That said, perhaps a solution is indeed to locally build .egg files and have DockerStorage pickle them
no... wait, that failed because when you register, it has to be done with
flow.register()
Let me see if I can come up with a repro
What you're missing is that when you do 
pip install .
 you're building a 
.egg
 file, but when you do an editable install you're not.
Or in other words, "editable" just means "load files from disk at runtime" while the
pip install .
means load it from the egg file.
a

Anna Geller

10/20/2021, 10:55 AM
sure but who will edit the package inside of a Docker image? I don’t understand why would I want a package to be editable in a Docker image? What build artifact it generates (whether an .egg or wheel file) is an implementation detail - Docker image is a packaging mechanism by itself. The result will be the same: a package is installed in the env so that it can be used by Prefect flows, right?
h

haf

10/20/2021, 10:55 AM
You're correct that noone "will edit" it, but it's more about the package lookup mechanism that Python uses than the use-case for the programmer.
a

Anna Geller

10/20/2021, 10:56 AM
ok, we’re on the same page about that.
h

haf

10/20/2021, 10:57 AM
Unless you're packaging the package and installing the package from source while you're building the dockerfile, you don't have the package in PYTHONPATH
All of this is implementation details
It would be neat if the ValueError that Prefect throws, e.g. would tell you about where it looked and its current working directory, and/or its PYTHONPATH
Docker image is a packaging mechanism by itself
Yes, but not "package" as in "python package"
a

Anna Geller

10/20/2021, 10:59 AM
sure, let’s focus on the flow code so we can reproduce. I think your dependencies are fine, only Storage of flows needs further introspection 👍
h

haf

10/20/2021, 10:59 AM
The result will be the same: a package is installed in the env so that it can be used by Prefect flows, right?
But yes, with the Dockerfile I posted the python packages can be referenced by Prefect flows; and this is what is needed
Right now, the problem is just pointing to the flow file on disk
I have to eat a bit though 🙂
a

Anna Geller

10/20/2021, 11:02 AM
As mentioned before, GKE cluster doesn’t have access to a local path on your computer. I don’t think this is a good path to follow. Either copying all local dependencies into a container image and pointing the Docker storage in the flow to this path in container OR leveraging other remote storage mechanism such as GCS which you decided you don’t want.
I think you meant the same - a path in the image - but you only said local path
h

haf

10/20/2021, 11:03 AM
Again I’m not using GCS so let’s stop talking about that please.
a

Anna Geller

10/20/2021, 11:03 AM
Enjoy your meal and take your time to build a reproducible example! 🙂
h

haf

10/20/2021, 11:04 AM
I’ll try a few different permutations soon :)
a

Anna Geller

10/20/2021, 11:04 AM
sure, as I mentioned, this is not the path we follow here
h

haf

10/20/2021, 11:05 AM
Well it’s not a good path to follow ;) Everything is moving towards immutable infrastructure and reproducible builds.
Finally running!
a

Anna Geller

10/21/2021, 8:43 AM
Nice work! If you want to share your solution, you are welcome to do so.
🙌 1
h

haf

10/21/2021, 8:44 AM
Yaks shaved: • the
tini
entrypoint which threw away all ENVs • this meant
pip
ran with the system pip, not the venv pip • you're right that it would have been better to install with requirements.txt — but only as-so-far that pipenv doesn't tie into
pyproject.toml
which seems to be "the way" nowadays after PIP https://www.python.org/dev/peps/pep-0518/
The biggest problem, undocumented that had me reading code was this:
docker = Docker(
                path="/app/flows/run_mmm.py",
                registry_url="ex/cd",
                dockerfile="Dockerfile",
                image_name="data-pipelines",
                image_tag=args.commit_ref,
                ignore_healthchecks=True,
                stored_as_script=True,
            )
            docker.add_flow(flow)
            flow.storage = docker
Because without explicitly
add_flow
it just crashes with the message I showed you before.
Also, this really needs docs.
a

Anna Geller

10/21/2021, 8:50 AM
Thanks for sharing @haf. So far it’s documented here.
h

haf

10/21/2021, 8:59 AM
I don't see any "ignore_healthchecks", and those docs "build" the storage, and there are no local editable deps there and there's no mention of building docker containers there either
so it's not.
those docs presume I use python to build the container
a

Anna Geller

10/21/2021, 9:34 AM
thanks for your feedback @haf, I will look into how we can include that.
h

haf

10/21/2021, 10:40 AM
I think it might be worthwhile to look into docs specifically on building and maintaining prefect as part of a CI/CD / devsecops pipeline and what invariants / requirements this would bring with it.
View count: 2