https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Adam Brusselback

    03/09/2021, 11:12 PM
    That all relies on the dependencies being there for you to use it though.
    m
    • 2
    • 1
  • t

    Trevor Kramer

    03/10/2021, 12:15 AM
    I am using BatchSubmit and I need to pass parameters to the job so I am passing
    n_estimators = Parameter('n_estimators', default=500)
    BatchSubmit()(..., batch_kwargs={
    'parameters': {'blah', n_estimators}})
    The issue is that batch requires the values for parameters be strings but I want the input Parameters to be integers. Is there a way to convert a Parameter from one type to another? Should I have a task that takes the parameter and returns the string version?
  • h

    Hui Zheng

    03/10/2021, 12:29 AM
    Hello Prefect team. I wonder how to properly raise a FAIL signal from within a task without being retried. Retries is mostly to handle intermittent failures. When we set
    retries=3
    for a task_A, we want to retry task_A for up to 3 times when it fails due to some network connection issue or intermittent failures. Meanwhile, however, we have the logic in the
    task_A
    that explicitly and intentionally raise
    FAIL_A
    signal for downstream tasks. That
    FAIL_A
    is not meant to be re-tried by
    task_A
    . When
    task_A
    encounters this intentional
    FAIL_A
    , it shall skip retries and return it directly. How could I do that?
    c
    m
    • 3
    • 16
  • c

    Carl

    03/10/2021, 4:20 AM
    I’m trying to use a parameter as a condition in a flow. What I’m finding is that if the condition is False, then every task after it is skipped.
    with Flow('MyFlow') as flow:
        do_thing = Parameter('do_think', default=False, required=False)
    
        data = get_data()
    
        with case(do_thing, True):
            data = do_thing_task(data)
    
        data = do_another_thing(data) # This gets skipped when do_thing = False
    c
    j
    • 3
    • 7
  • s

    Soren Daugaard

    03/10/2021, 1:03 PM
    Is there a way to delete artifacts from a specific task run? I am seeing a
    delete_artifact
    method here: https://docs.prefect.io/api/latest/artifacts/artifacts.html#functions but it is not clear to me how I would obtain the
    task_run_artifact_id
    to use it?
    m
    • 2
    • 1
  • v

    Vitalik

    03/10/2021, 1:40 PM
    Hi – I have a basic flow of flows, with two flows within running sequentially. Tested in Core environment, everything finished as expected. Running the same in Cloud immediately gives the error below. Interestingly, first flow actually runs and finishes successfully (visible in Runs screen of that flow). However, both flows are marked as failed in the group flow. Feels like a bug, but I could be overlooking something. Any help will be greatly appreciated! Flow of flows created as following (based on https://docs.prefect.io/core/idioms/flow-to-flow.html tutorial): flow1_trig = StartFlowRun(flow_name='flow1', project_name='prj', wait=True) flow2_trig = StartFlowRun(flow_name='flow2', project_name="prj", wait=True) with Flow("group") as flow_grp:    flow2_trig.set_upstream(flow1_trig) Error: Unexpected error: ClientError([{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}]) Traceback (most recent call last): File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 856, in get_task_run_state value = prefect.utilities.executors.run_task_with_timeout( File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 449, in method return run_method(self, *args, **kwargs) File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 172, in run run_link = client.get_cloud_url("flow-run", flow_run_id) File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/client/client.py", line 887, in get_cloud_url tenant_slug = self.get_default_tenant_slug(as_user=as_user and using_cloud_api) File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/client/client.py", line 920, in get_default_tenant_slug res = self.graphql(query) File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/client/client.py", line 318, in graphql raise ClientError(result["errors"]) prefect.utilities.exceptions.ClientError: [{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}]
    h
    j
    • 3
    • 4
  • l

    liren zhang

    03/10/2021, 1:44 PM
    Hi all, I have written a sample flow which retrieve flow code from github and execute in a docker agent lives on an EC2 instance. From the log, it is showing that code was retrieved correctly from GitHub but failed execution and it was giving a weird error.
    Failed to load and execute Flow's environment: NameError("name 'Local' is not defined")
    I am not entirely sure where the name normal is from Here is my sample code for reference:
    from prefect import task, Flow
    from prefect.run_configs import DockerRun
    from prefect.storage.github import GitHub
    
    @task
    def say_hello():
        print("Hello, world!")
    
    with Flow(name="My first flow with Docker agent", storage=GitHub(repo="bbbb/aaaa-bdp", path="/PREFECT/hello_world_github.py", access_token_secret="GIT_ACCESS_TOKEN")) as flow:
        say_hello=say_hello()
    flow.run_config=DockerRun(labels=['<http://prefect.aaaa.com|prefect.aaaa.com>'])
    
    #flow.run()
    flow.register("first_prefect_project")
    j
    • 2
    • 8
  • a

    Adam Brusselback

    03/10/2021, 3:12 PM
    Just wondering if anyone has used Prefect with Hashicorp Vault before? Any tricks to share?
    d
    b
    • 3
    • 9
  • s

    Samuel Hinton

    03/10/2021, 3:34 PM
    Hi team! Given that theres no support at the moment for flow concurrency in prefect-server, Im trying to make sure that a large influx of late tasks dont swamp our server and bring it down.  Is there a way to automatically cancel tasks that are more than a timedelta late?  For example, if a task is 1 minute late, go ahead and run it. But if its 10 hours late, cancel it.
    j
    b
    • 3
    • 5
  • s

    Steve Aby

    03/10/2021, 3:57 PM
    Good Morning. I have a newbie and general and maybe best practice question that I am looking for opinions on. I have two main processes - process A and process B. Process B is dependent on process A completing but each process is very distinct. In prefect, is it best to 1): put all the tasks from both processes in a single Flow and just run sequentially; or 2) have two separate flows with a dependency between the the flows; or 3) does it matter? Thanks and just looking for viewpoints
    j
    • 2
    • 2
  • a

    Aaron Goodrich

    03/10/2021, 8:35 PM
    Hey guys pretty new to prefect, probably an easy solution here, but I'm trying to move to prefect from our current system of standalone dockerized ETL integrations.   Normally I use docker compose and mount the directory to the image so I can get any csv's I generate back out of the container.   In prefect, I see I can throw my script into an image at runtime with
    flow.storage = Docker(
        path="my_flow.py",
        files={"/source/of/my_flow.py": "my_flow.py"},
        stored_as_script=True
    )
    but I don't see how I can get, say, my dynamically generated files back out.   Any suggestions?
    j
    • 2
    • 2
  • a

    ale

    03/10/2021, 10:16 PM
    Hey folks 🙂 Is there a way to enrich a task state with additional variables? I know this can be done when using signals, but I would like to add some custom variables to a task state even without using signals. The main reason for doing this is to consume the variables in state handlers
    j
    • 2
    • 5
  • c

    Charles Liu

    03/10/2021, 11:22 PM
    Hi! Can
    flow.run_config = KubernetesRun(image="example/image-name:with-tag")
    be a remote image in a private repo?
    j
    • 2
    • 7
  • m

    Matthew Blau

    03/11/2021, 12:45 AM
    Hello all, currently I have a docker image that I have a flow run, and it works well, however, I was hoping to learn if it is possible that from within that image I can at a minimum expose logging information to the Prefect UI as well as possibly having tasks contained in that image that my Flow that is external to that image can pick up and run. Thank you in advance!
    m
    m
    • 3
    • 6
  • m

    Maria

    03/11/2021, 4:53 AM
    Hi all, I am a bit unsure of how should I deal with Late Runs (eg when my agent was down for some time and now just deploying flow runs one after another to catch up ignoring schedule). It is a test scenario and tasks are small and don't change target state, but in real world I usually don't want to "catch up" - in ETL use case if I missed 5 hourly jobs, I would process those 5 hours of data at once rather than sending 5 separate jobs for execution. Should I design flows which take care of this logic (eg it will still be 5 jobs, but last 4 will do nothing since I'll have checks in place) or there is another way to achieve this?
    m
    • 2
    • 6
  • c

    CA Lee

    03/11/2021, 9:11 AM
    Hello all, trying out the latest Prefect version
    0.14.12
    Running into this error when attempting to run a flow using ECS agent and ECSRun:
    botocore.errorfactory.InvalidParameterException: An error occurred (InvalidParameterException) when calling the RunTask operation: Task definition does not support launch_type FARGATE.
    I have a working config for prefect agent that executes the flow without errors. However, this involves creating a `task-definitions.yaml`:
    prefect agent ecs start -t token \
        -n aws-ecs-agent \
        -l label \
        --task-definition /path/to/task-definition.yaml \
        --cluster cluster_arn
    task-definitions.yaml
    networkMode: awsvpc
    cpu: 1024
    memory: 2048
    taskRoleArn: task_role_arn
    executionRoleArn: execution_role_arn
    The flow runs without errors, so the error is not due to IAM permissions. However, when running the ECS Agent using the
    --task-role-arn
    and
    --execution-role-arn
    CLI args, I run into the above-mentioned error. I have also tried running Prefect agent using
    --launch-type FARGATE
    , which I believe is the default and does not need to be specified, but this does not work too.
    prefect agent ecs start -t token \
        -n aws-ecs-agent \
        -l ecs \
        --task-role-arn task_role_arn \
        --execution-role-arn execution_role_arn \
        --cluster cluster_arn
    I have also tried to pass in
    task_role_arn
    and
    execution_role_arn
    into the ECSRun() function within my flow, and ran into the same error. Is there any way to run ECS Agent using CLI args without using the task-definition file?
    m
    m
    j
    • 4
    • 12
  • c

    Chris Smith

    03/11/2021, 12:45 PM
    Hi all, does anyone know if it’s possible to have multiple teams and/or restrict user access to particular projects?
    g
    m
    • 3
    • 6
  • m

    Matthew Blau

    03/11/2021, 2:32 PM
    Hello all, I have a flow that creates a container, starts the container, and grabs the container logs. Unfortunately I cannot seem to output logging infomation to where Prefect Server UI can see them. What am I doing wrong? Code and visualization will be in the thread
    • 1
    • 4
  • j

    Justin

    03/11/2021, 2:59 PM
    Hey guys, one question... I experienced this on multiple systems, usually all ubuntu server lts 20.04, installing with pip. On every instance it will just not work out of the box. It will fail to connect to the graphql endpoint on localhost:4200. Sometimes it worked to set the docker internal ip of the container, sometimes it only worked to set the public ip (very insecure!), but it just never works out of the box - which is very annoying. Am I missing something? Steps I usually do • new ubuntu server • install docker (official repo) & docker-compose • apt install python3 python3-dev python3-pip • pip install prefect • prefect backend server • prefect server start • prefect agent local start And then I won't get a connection but it will redirect me to the "Welcome to your prefect ui" screen, where I try out one of the abovementioned IPs and - if I'm lucky - it will work, if not it will not work at all on that machine, even when pruning docker & reinstalling everything curl localhost:4200, 127.0.0.1:4200, dockerinternalip:4200 all work fine
    m
    m
    • 3
    • 13
  • h

    haf

    03/11/2021, 3:58 PM
    Has anyone encountered and solved
    Unexpected error while running flow: KeyError('Task slug resolve_profiles_dir-1 not found in the current Flow; this is usually caused by changing the Flow without reregistering it with the Prefect API.')
    — I set up all agents/cli:s two days ago at their latest versions, and the agent is a k8s one. I'm using Prefect cloud. This happens when I register the flow as such and then run it from the UI (I verified the UI archives the old version and I'm triggering a run of the updated version)
    #!/usr/bin/env python
    
    from logging import getLogger
    from datetime import timedelta
    from os import getenv
    from pathlib import Path
    from pendulum import today
    from prefect.engine.state import Failed
    from prefect.schedules import IntervalSchedule
    from prefect.storage import Docker
    from prefect.utilities.notifications import slack_notifier
    from prefect.utilities.storage import extract_flow_from_file
    
    logger = getLogger("dbt.deploy")
    
    with open('requirements.txt') as file:
        packages = list(line.strip() for line in file.readlines())
    
    docker = Docker(
        registry_url="europe-docker.pkg.dev/projecthere/cd",
        # dockerfile='Dockerfile', # Uncomment to use own Dockerfile with e.g. dependencies installed
        image_name="analytics-dbt",
        image_tag="0.14.11",
        python_dependencies=packages
    )
    
    slack = slack_notifier(
        only_states=[Failed],
        webhook_secret='SLACK_WEBHOOK_URL')
    
    every_hour = IntervalSchedule(
        start_date=today('utc'),
        interval=timedelta(hours=1))
    
    flows = sorted(Path('flows').glob('*.py'))
    
    # Add flows
    for file in flows:
        flow = extract_flow_from_file(file_path=file)
        <http://logger.info|logger.info>('Extracted flow from file before build')
    
        docker.add_flow(flow)
    
    # Build storage with all flows
    docker = docker.build()
    
    # Update storage in flows and register
    for file in flows:
        flow = extract_flow_from_file(file_path=file)
        <http://logger.info|logger.info>('Extracted flow from file after build')
    
        flow.storage = docker
        flow.state_handlers.append(slack)
        flow.schedule = every_hour
    
        <http://logger.info|logger.info>('Registering...')
        flow.register(
            project_name='dbt',
            build=False,
            labels=['prod'],
            idempotency_key=flow.serialized_hash(),
        )
    • 1
    • 3
  • j

    Jack Sundberg

    03/11/2021, 4:17 PM
    How does your team advise for scaling concurrency in Agents? My LocalAgent crashes once too many flows are submitted and concurrently running (roughly 300) -- and I believe this is because too many subprocesses are created, causing my computer to kill the parent process.
    s
    m
    • 3
    • 12
  • j

    Justin Chavez

    03/11/2021, 5:21 PM
    Hi all, I am trying to run multiple linked StartFlowRuns that share results from some of the tasks from one flow to the next, I was looking at the previous answer (https://prefect-community.slack.com/archives/CL09KU1K7/p1607733221071000?thread_ts=1607727047.066800&amp;cid=CL09KU1K7), but it is not working for me as StartFlowRun returns a Task not a State in my instance. Is there a new solution for this?
    m
    • 2
    • 7
  • h

    haf

    03/11/2021, 5:54 PM
    Thanks for bearing with all my questions. I have one about k8s agents. Seeing that they are stateless pods, is there a way to set the agent id? https://docs.prefect.io/orchestration/agents/kubernetes.html#kubernetes-agent — this page doesn't mention anything about it, and it would seem that once the pod with the agent is restarted it gets a new id.
    m
    • 2
    • 19
  • j

    John Grubb

    03/11/2021, 6:27 PM
    Hi there 👋 - a question about scheduling and parameters in the UI. What we'd really like to do is set up multiple schedules for a given flow, each with their own sets of parameters. Has this ever been mentioned before? Usecase is that we want to reuse a flow that we've built without having to register it multiple times or copy it in the codebase.
    👀 1
    n
    • 2
    • 3
  • c

    Charles Liu

    03/11/2021, 7:57 PM
    Hey all, just looking for some further clarification on the proper usage of storage = Docker() and KubernetesRun() together. Could someone shine some light on why I can deploy and run a flow with the storage=Docker(...dockerfile="my dockerfile") [with confirmed EC2 activity and successful front end runs], but when I mute that dockerfile param and instead try to use KubenernetesRun(image="my image here") with the same kind of registry as the Docker storage destination, my private packages are missing? In my case, both images are built from the same Dockerfile, the latter scenario just involves pushing to a repo first and attempting to call it with KubernetesRun().
    c
    • 2
    • 4
  • f

    Frederick Thomas

    03/11/2021, 10:31 PM
    Greetings and salutations,
    👋 5
  • a

    Alex Welch

    03/12/2021, 4:03 AM
    hello, I am running into an issue with an
    ECS Agent
    . I get the below error on my flow run. I have checked, and found, the
    3.8.5
    version (it was the docker container I built the flow in), but I can not determine where the
    3.7.10
    version is. I looked in the
    EC2
    instance that I set up and launched the agent from and it is at
    3.8.7
    . Where else is Prefect running the flow that may have the
    3.7
    Python version and is there any way around this other than upgrading that to
    3.8
    ?
    ✅ 1
    c
    • 2
    • 31
  • a

    Akash Rai

    03/12/2021, 4:40 AM
    Hey guys. Pretty new to this ETL stuff so forgive my innocence with respect to this topic. I need this orchestration for one of my projects with given use cases. - The tasks are shell scripts. - The shell scripts pass a file path to its descendant in the DAG to proceed with the flow run. - In case of a failure at task A, instead of restarting the whole run, pass the file path to task A dynamically by UI or CLI and proceed with the execution. Using the previous outputs of the ancestors of A. - Have a sub DAG that logically groups my tasks. In UI also this would help me group the tasks. One question I have regarding states is that are state changes atomic operations? Any help is highly appreciated.
    c
    • 2
    • 3
  • j

    Jacob Blanco

    03/12/2021, 7:23 AM
    Hey folks, we’re seeing issues with a flow that requires a manual approval step in Cloud. When we click the button we get an error notification with an empty message, and the logs show that approval was given but the flow does not continue.
    n
    • 2
    • 5
  • f

    Florian Kühnlenz

    03/12/2021, 10:31 AM
    Hi, I played around some more with prefect, but I could no wrap my mind around how to transition from our current system. Currently: • multiple docker containers separated by concerns/topics • (too many) cron jobs Goal: • multiple docker images • central orchestration Migration: 1. add prefect to project dependencies 2. rewrite cron as flow(s) 3. build image (automated) 4. register jobs (automated) I can not get a clear understanding of how to accomplish 3 and 4. We definitely also need an image to spin up in other environments. Do I build the image normally push it in the registry and use it in the run_config? If so how do I register the flow and with what storage config? I assume I will run register from within the container. But what is the storage config in that case? Any help would be appreciated. I will try to write about our transition once I figured it out. I believe we are not the only ones with a setup like this.
    m
    m
    • 3
    • 20
Powered by Linen
Title
f

Florian Kühnlenz

03/12/2021, 10:31 AM
Hi, I played around some more with prefect, but I could no wrap my mind around how to transition from our current system. Currently: • multiple docker containers separated by concerns/topics • (too many) cron jobs Goal: • multiple docker images • central orchestration Migration: 1. add prefect to project dependencies 2. rewrite cron as flow(s) 3. build image (automated) 4. register jobs (automated) I can not get a clear understanding of how to accomplish 3 and 4. We definitely also need an image to spin up in other environments. Do I build the image normally push it in the registry and use it in the run_config? If so how do I register the flow and with what storage config? I assume I will run register from within the container. But what is the storage config in that case? Any help would be appreciated. I will try to write about our transition once I figured it out. I believe we are not the only ones with a setup like this.
m

Michael Adkins

03/12/2021, 2:57 PM
Hey @Florian Kühnlenz -- it may help to take a look at https://github.com/PrefectHQ/prefect/discussions/4042
But basically the answer is: Yes you can build your base docker image like normal and reference it from a
DockerRun
config and then store your flow on
S3
or
Github
f

Florian Kühnlenz

03/12/2021, 3:01 PM
Hi @Michael Adkins , thanks for the link! But S3 or Github are not an option. I would really prefer if everthing would be contained in the same container or several if necessary.
m

Michael Adkins

03/12/2021, 3:02 PM
Then you can use
DockerStorage
and prefect will build the image at registration time and package your flow into it
This will automatically work with a Docker agent without a run config.
f

Florian Kühnlenz

03/12/2021, 3:03 PM
Okay, so I would build my image as normal, and then let prefect build another for the flow?
m

Michael Adkins

03/12/2021, 3:04 PM
I think you may want to set the base image for the flow's DockerStorage to your image in that case
I'm not quite sure what your setup looks like though!
f

Florian Kühnlenz

03/12/2021, 3:05 PM
I tried to explain best I could. We have a bunch of containers running cron jobs. All with different dependencies. Now we want to migrate to prefect.
I think if I try to register the flow with DockerStorage and build=true I will also run into the Docker in Docker problem, because in out CI tool I can only add python dependencies inside the Docker image.
m

Michael Adkins

03/12/2021, 3:36 PM
If your CI doesn't support building a docker image then I'm not sure you can avoid using Github/S3/something as a storage for your serialized flow. You can use Local storage if you don't want to rely on an external service but then you'll need to copy the serialized flow files to your execution environment.
f

Florian Kühnlenz

03/12/2021, 5:11 PM
No you misunderstood. The CI can build docker images for sure. But I can not install prefect within the CI environment, except inside the image I am building. So the only way to run prefect is inside the image. If prefect then tries to build, I assume it needs to build the image inside the image, right?
m

Michael Adkins

03/12/2021, 5:22 PM
Ah sorry about that. Yes, you are correct. For this reason, we don't recommend trying to use DockerStorage if you cannot install/run prefect outside a Docker container. You could work around this by adding the flow manually to your docker image but we don't have a guide written for this because it's not a typical pattern. I'll see if I can get a description of what you'd have to configure.
🙏 1
f

Florian Kühnlenz

03/12/2021, 5:26 PM
That would be great! I feel like the situation we are in is not that untypical if you have to keep things on premise.
I got as far as registering the flow via CI and the Docker Agent fetching the correct image. However the execution does not work. I get:
Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage')
m

Michael Adkins

03/12/2021, 8:31 PM
Hey! I ran into a bit of hiccup with this but it seems sorted 🙂 Here's the flow file test-docker-storage.py
from prefect import Flow, task
from prefect.storage import Local
from prefect.run_configs import DockerRun

@task(log_stdout=True)
def say_hello():
    print("Hello world")

with Flow("docker-storage-example") as flow:
    say_hello()

flow.storage = Local(stored_as_script=True, path="/flow.py", add_default_labels=False)
flow.run_config = DockerRun(image="image-with-flow:latest")

if __name__ == "__main__":
    flow.register("default")
Here's the minimal Dockerfile
FROM prefecthq/prefect:latest

ADD test-docker-storage.py /flow.py
🙏 1
Oh and I built the docker file with
--tag image-with-flow:latest
f

Florian Kühnlenz

03/12/2021, 8:53 PM
Many thanks! I was wondering if the local storage is the way to go. I might only be able to try it out on Monday, but I will report back.
m

Michael Adkins

03/12/2021, 8:58 PM
@Marvin archive "Storing and running flows in
Docker
without building
DockerStorage
at registration time"
m

Marvin

03/12/2021, 8:58 PM
https://github.com/PrefectHQ/prefect/issues/4248
View count: 1