https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Jake

    03/03/2022, 2:43 PM
    We have a parameter that gets passed to some of our tasks (like which DB endpoint to point to); when this value changes, re-registration won’t happen (since it doesn’t count as any of the metadata) but how can I make it so that this change does trigger a re-registration?
    a
    k
    48 replies · 3 participants
  • k

    Keith Veleba

    03/03/2022, 3:15 PM
    hello, I'm running flows in AWS ECS and I'm using the "prefecthq/prefect:latest-python3.8" image in my flow's ECSRun run_config. I just added a PostgresFetch task and now im getting an a error that the extras are not installed in the prefect image I'm using. Is there an alternative base image I should be using?TIA
    a
    m
    +1
    44 replies · 4 participants
  • x

    Xavier Babu

    03/03/2022, 3:52 PM
    Hi Prefect Community, Please provide available tutorial or doc link to integrate Prefect 2.0 with Tomcat Web/App Server.
    k
    9 replies · 2 participants
  • d

    Dominick Olivito

    03/03/2022, 3:54 PM
    hello, i'm trying to run on GCP GKE with a custom image. the pod is dying immediately with these errors in the logs:
    /home/flex/.local/bin/prefect: line 3: import: command not found
    /home/flex/.local/bin/prefect: line 4: import: command not found
    /home/flex/.local/bin/prefect: line 5: from: command not found
    /home/flex/.local/bin/prefect: prefect: line 7: syntax error near unexpected token `('
    /home/flex/.local/bin/prefect: prefect: line 7: `    sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])'
    it looks like it's finding and parsing the
    prefect
    executable file but not running it with
    python
    . when I run a local container using the image, i'm able to successfully call the command
    prefect
    and run a flow.
    prefect
    is in the path of active user (
    flex
    ). i'm also able to run basic flows successfully on GKE using prefect's base image, so the issue is specific to our custom image. do you have any suggestions on what we can check in our custom image?
    k
    a
    5 replies · 3 participants
  • v

    Vadym Dytyniak

    03/03/2022, 4:50 PM
    Hi. Could you please provide minimum IAM permissions required for Prefect ECS agent?
    k
    3 replies · 2 participants
  • c

    Christian Nuss

    03/03/2022, 4:51 PM
    anyone have a lil cheatsheet snippet for a
    KubernetesRun
    defininig the
    job_template
    as a dict?
    k
    15 replies · 2 participants
  • p

    Prasanth Kothuri

    03/03/2022, 5:15 PM
    Hi All, I would like to write a pandas dataframe as csv to s3 in prefect, shouldn't this work?
    # upload to s3
    write_to_s3 = S3Upload(
        bucket=s3_bucket,
        boto_kwargs=dict(
            endpoint_url=os.getenv("s3_endpoint"),
            aws_access_key_id=os.getenv("s3_access_key"),
            aws_secret_access_key=os.getenv("s3_secret_key")
        )
    )
    
    output = write_to_s3(results.to_csv(index=False), key=file_name)
    k
    a
    3 replies · 3 participants
  • a

    Aram Panasenco

    03/03/2022, 6:16 PM
    I used task looping in a really long flow that failed 6 hours in due to an obscure error. I was disappointed to see that when I restarted it from the Prefect Cloud UI, it restarted from the beginning rather than from the latest state. Is there a way to preserve "task_loop_result" context and use it in the task when manually restarting a failed flow?
    k
    16 replies · 2 participants
  • i

    Ifeanyi Okwuchi

    03/03/2022, 8:03 PM
    Hello all. I tried using
    create_flow_run.map()
    with
    wait_for_flow_run()
    and
    get_task_run_result()
    but I'm getting some errors. I'm not sure if I'm doing this correctly
    a
    19 replies · 2 participants
  • s

    Seth Just

    03/03/2022, 8:15 PM
    We are running into an issue with flow runs hanging after a task fails -- a task directly downstream from the failure ends up stuck in a Pending state even though all its upstream tasks are complete. We're using an
    ECSRun
    with a
    LocalDaskExecutor
    and I'm unable to find any way to debug or diagnose why the task isn't ever executed.
    k
    22 replies · 2 participants
  • s

    Sam Werbalowsky

    03/03/2022, 9:40 PM
    Continuing to be confused by parameters here…is there any easy way to dynamically use parameters as strings for various task inputs. IE I have a
    param=Parameter("myvalue", default="myvalue")
    . I want to us it in a variety of tasks say…
    execute = execute_file(f"sql/{param}.sql")
    upload = upload_file(f"{param}.csv")
    Is there a way to do this without constructing a task for each input?
    k
    13 replies · 2 participants
  • y

    YD

    03/03/2022, 9:53 PM
    Passing Pandas DF to task if i have
    task_1
    return a df and I pass the df as an input to
    task_2
    , does the df physically go to the Prefect server ? the issue I have is that it looks like it is getting stuck when trying to return the df and pass it to the next task. is there a better way to pass df from one task to the next ?
    @task()
    def task_1():
        df = ....
        return df
    
    @task()
    def task_2(df):
        df2 = df ....
        return df2
    
    def main():
        with Flow("voc_sentiment") as flow:
            df = task_1()
            df2 = task_2(df)
    k
    y
    25 replies · 3 participants
  • d

    Daniel Saxton

    03/04/2022, 12:50 AM
    according to the docs it's possible to define individual environment variables at runtime for a given agent by using the
    --env
    flag, but is there also something like an
    --env-file
    flag if you have several environment variables defined in a file (kind of like you have with docker-compose)? https://docs.prefect.io/orchestration/getting-started/flow-configs.html#configure-environment-variables
    :discourse: 1
    k
    a
    +1
    6 replies · 4 participants
  • m

    Matt Alhonte

    03/04/2022, 1:22 AM
    Heya! Trying to make a flow that does some GraphQL stuff - specifically, turning off schedules (via
    set_schedule_inactive
    ). It lets me read (I grab all the flows from a given project), but the
    mutate
    gives me this error (the query works when I go to the Interactive tab on the GUI btw)
    AuthorizationError([{'path': ['flow'], 'message': 'AuthenticationError: Forbidden', 'extensions': {'code': 'UNAUTHENTICATED'}}])
    k
    a
    17 replies · 3 participants
  • s

    Sen

    03/04/2022, 5:30 AM
    Hey Good Morning everyone, Have anyone tried to use an NVIDIA-DOCKER as a prefect agent to run some flows. I have got the setup ready and running, but when the task which needs to use the GPU's start executing, I see it doesn't use the GPU, but instead it runs on CPU. I can see the below logs

    https://pasteboard.co/kpsNsqpN3aXP.png▾

    a
    k
    49 replies · 3 participants
  • t

    Thomas Opsomer

    03/04/2022, 4:22 PM
    Hello community 🙂 We experienced a strange behaviour of the "restart" button on prefect cloud: When going to a task run and restarting the task, it correctly reschedules the task, but it doesn't reschedule the downstream tasks (in the UI they stay green instead of light blue)
    k
    m
    20 replies · 3 participants
  • k

    kevin

    03/04/2022, 5:14 PM
    hey guys would anyone have a simple example of using a GQL query to trigger a flow run I can look at?
    k
    5 replies · 2 participants
  • l

    Ling Chen

    03/04/2022, 5:46 PM
    Hello ! We have some memory intensive flows. These flows sometimes are triggered at the same time and executed through a LocalExecutor. The LocalExecutor will immediately launch all flow runs and cause memory errors. I wonder if there is a way to make LocalExecutor or any executor to execute one flow run at a time (sequentially) rather than launch all of flow runs at the same time?
    k
    5 replies · 2 participants
  • k

    Ken Nguyen

    03/04/2022, 6:15 PM
    I’ve built a flow that continues to be stuck in the ‘Submitted’ state. Following this documentation, I was able to narrow down that the cause is with my docker image. I have multiple docker images in my repo that works fine, but using this specific docker image in my ECSRun causes my flow to be stuck in a submitted state. Could anyone let me know what might be wrong with this docker image?
    FROM ubuntu:18.04
    COPY requirements.txt /requirements.txt
    COPY google_secret.json $HOME/.config/gspread_pandas/google_secret.json
    ENV PATH="/root/miniconda3/bin:$PATH"
    ARG PATH="/root/miniconda3/bin:$PATH"
    RUN apt-get update
    
    RUN apt-get install -y wget && rm -rf /var/lib/apt/lists/*
    
    RUN wget \
        <https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh> \
        && mkdir /root/.conda \
        && bash Miniconda3-latest-Linux-x86_64.sh -b \
        && rm -f Miniconda3-latest-Linux-x86_64.sh 
    RUN conda install pip
    RUN pip install -r requirements.txt
    RUN conda install -c conda-forge theano-pymc -y
    k
    a
    23 replies · 3 participants
  • k

    Ken Nguyen

    03/04/2022, 6:39 PM
    I have a noob question, are you able to add a
    @task
    decorator on a function imported from a library (as opposed to one I defined)?
    k
    3 replies · 2 participants
  • j

    Jack Chang

    03/04/2022, 6:40 PM
    Im trying to return three pandas dataframes in a task (aka function) and it gives me this
    TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
    Can anyone point me to an example where I can make it iterable?
    k
    2 replies · 2 participants
  • h

    Hedgar

    03/04/2022, 6:59 PM
    My flow failed and when I checked the error is the exception I inserted to check for completeness of data being extracted. I Actually did it because in the past without this check incomplete data got extracted. Could it be the way I raised the ValueError exception? Is there's a way in prefect to do exception that would suit my case? and secondly is there a way I can link this exception to a retry mechanism? That is if this occurred the flow could be seen as failed and therefore retry
    k
    7 replies · 2 participants
  • d

    Daniel Saxton

    03/04/2022, 7:06 PM
    small nitpick here, but when logging into Prefect Cloud the suspicious login alerts show up as coming from Okta instead of Prefect, does it make more sense to brand these as Prefect emails just to minimize confusion (and not reveal what Prefect is using on the backend)?
    k
    3 replies · 2 participants
  • l

    Leon Kozlowski

    03/04/2022, 8:40 PM
    Can I have a complex schedule that runs the same flow at the same time with different parameters?
    k
    8 replies · 2 participants
  • h

    Henry

    03/04/2022, 8:55 PM
    is it bad practice to define a parameter in a module scope rather than in the flow context?
    input_a = Parameter('a')
    
    @task 
    def abc(b, c):
      return a + b + c
    
    with Flow("abc_flow") as flow:
      # in context
      result = abc()
    k
    2 replies · 2 participants
  • j

    Jared Teerlink

    03/04/2022, 9:21 PM
    Hi Team - Trying to use a secret I created in the Cloud UI with Flow(name='flow_example') as flow: my_secret = PrefectSecret("my_password") data = needs_credentials(snow_pw=my_secret) Error that no local variable found. Need to set to reference cloud API
    k
    3 replies · 2 participants
  • m

    Martha Edwards

    03/04/2022, 9:58 PM
    Hello Prefect community! I'm new here, exploring Prefect's features for the first time. I have a question. Can Prefect tasks be long-running listener tasks, sort of like Sensors in Airflow? Any recommendations or best practices to accomplish this?
    k
    14 replies · 2 participants
  • k

    Kevin Kho

    03/04/2022, 11:19 PM
    Hi everyone, join the #events channel to keep up with Prefect events (or show/advertise your own Prefect-related event)
    :upvote: 4
  • k

    Kevin Kho

    03/04/2022, 11:30 PM
    Kicking off the #events channel with my latest Meetup presentation!
    d
    a
    4 replies · 3 participants
  • h

    Hans Lellelid

    03/05/2022, 8:39 PM
    Hi folks -- I'm exploring Prefect Orion as an option to replace Airflow. (I understand it's an evolving technical preview; this is for a longer-term R&D project.) First off, let me complement the team on the documentation. While maybe everything isn't documented yet, the documentation is excellently written. One question I had that was not immediately clear from the docs (and I've tried some searching, but haven't turned up an answer) is whether Deployments can be invoked with arbitrary parameters? It looked from the docs like I would need to create a different Deployment for each parameter? Maybe stepping back a bit, what I want to do is use an API to trigger "on-demand" pipeline work for a given input (e.g. building map tiles for a geo region). If Deployments aren't the right way to think about this problem, would it be better to just instantiate a Flow with a parameter [via API]? (I understand this to be possible based on some docs I was reading last night.) Ultimately, we would want to kick off these ETL or longer-running jobs and be able to get information about where in the system (which task in a flow) a specific job is. It feels like Prefect is a good fit for this workflow. Thanks in advance!
    k
    m
    9 replies · 3 participants
Powered by Linen
Title
h

Hans Lellelid

03/05/2022, 8:39 PM
Hi folks -- I'm exploring Prefect Orion as an option to replace Airflow. (I understand it's an evolving technical preview; this is for a longer-term R&D project.) First off, let me complement the team on the documentation. While maybe everything isn't documented yet, the documentation is excellently written. One question I had that was not immediately clear from the docs (and I've tried some searching, but haven't turned up an answer) is whether Deployments can be invoked with arbitrary parameters? It looked from the docs like I would need to create a different Deployment for each parameter? Maybe stepping back a bit, what I want to do is use an API to trigger "on-demand" pipeline work for a given input (e.g. building map tiles for a geo region). If Deployments aren't the right way to think about this problem, would it be better to just instantiate a Flow with a parameter [via API]? (I understand this to be possible based on some docs I was reading last night.) Ultimately, we would want to kick off these ETL or longer-running jobs and be able to get information about where in the system (which task in a flow) a specific job is. It feels like Prefect is a good fit for this workflow. Thanks in advance!
k

Kevin Kho

03/05/2022, 8:57 PM
It wouldn’t be through the deployment interface but it would be by interacting with the REST API like you suggest. If you have parameters that you will consistently run, then yes you can have multiple deployments.
h

Hans Lellelid

03/05/2022, 9:04 PM
Ok, thank you. So a Deployment is not necessary when I'm actually deploying flows that I want to invoke via API -- I'm glad that I was correct in understanding that the deployment also includes any parameters (i.e. frozen in there), but under what conditions would I need a Deployment vs. just invoking a Flow via REST API?
k

Kevin Kho

03/05/2022, 9:10 PM
Yes for event-driven flows you shouldn’t need. Deployment would be for scheduled flows (run every day), and then the scheduler will create those runs
m

Michael Adkins

03/05/2022, 10:36 PM
This isn’t quite correct. I’ll try to give some bullets that explain the use-case for deployments: • You may run a flow by calling it in Python and it will be tracked by the API as a flow run. You can do this however you like wherever you like as long as you point it to your API. • If you want to trigger a flow run via the API, you need to “deploy” your flow by creating a deployment. • A deployment tells us how to get the code for a flow. Optionally, it may include a schedule to automatically trigger runs in a scheduled manner. Also optionally, it may override default parameters for the flow. • Once you have a deployment, you can create flow runs for it via the API. At this point, you may override the flow or deployment default values for parameters. • API triggered flow runs are submitted for execution by agents, which use a flow runner (the default is configured on the deployment but can be overridden per flow run) to create infrastructure, download your code from the location specified in the deployment, and start execution.
:upvote: 1
To create a flow run for a deployment with new parameters, you could use the
OrionClient.create_flow_run_from_deployment
method which allows you to pass parameters https://orion-docs.prefect.io/api-ref/prefect/client/#prefect.client.OrionClient.create_flow_run_from_deployment
k

Kevin Kho

03/05/2022, 10:38 PM
Ah ok I was wrong. Michael’s answer makes complete sense :upvote:
m

Michael Adkins

03/05/2022, 10:39 PM
The CLI doesn’t support this yet (
prefect deployment run
) because parsing parameters from the CLI into JSON is a bit of a pain and I haven’t implemented it, but we will definitely have it in the future.
:upvote: 1
h

Hans Lellelid

03/07/2022, 2:27 AM
Ah, ok, I think this just became clear to me, but will rephrase just to make sure I am understanding: • I can call flows from my python code all I want without creating Deployments. In this case it would also be my API code that is responsible for invoking these. It sounds like if we wanted to keep this appropriately decoupled from our webapp backend, we'd write a think wrapper API that kicks off the flows in this manner. • If I want to use the Orion API to trigger the flows, then they need to be turned into Deployments, but there is a mechanism to do this with an API. (Though I imagine doing this for adhoc queries could dirty up the system over time, so maybe doesn't make the most sense.). I saw some good blog posts on Prefect 1.0 code organization recommendations, but I think that might be a valuable thing to do eventually for Orion as well. Getting it up and running locally was super easy and love how magically things I'm running in console are showing up in the server. It became less clear to me how I'd manage large codebases of tasks and flows and what the entrypoints would look like for this / how these get packed into Docker containers. Anyway, I think this just comes down to trying to prototype out one of our current DAGs in Airflow. Not intending to derail this thread, though. I really appreciate the help!
k

Kevin Kho

03/07/2022, 2:43 AM
Creating a Deployment using the API is not recommend. There is so much logic that converts a Deployment to the appropriate API request that it would be very painful to replicate on your end. I think the recommendation would be to make a deployment using the Python client then just invoke. Yes we’ll definitely make material around that as Orion comes off technical preview. I suspect there are limitations to just invoking the Python code without deployments. As a quick example, i don’t think you can tag Flows through the Flow object. You need it on the deployment. Also, the execution environment (FlowRunner) is specified on the DeploymentSpec. Michael might correct me though
👍 1
View count: 2