https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Mukamisha jocelyne

    03/12/2022, 11:22 AM
    Hi! I am trying to create a directory within a container, something simple that looks like the code snippet below (in the thread). The task is running successfully but failing to create the data directory. I was thinking that it's supposed to create the directory without any issue, any suggestions to what I am missing please? Thank you
    k
    • 2
    • 18
  • j

    James Harr

    03/12/2022, 2:20 PM
    Hi, I'm trying to work through the Prefect Orion tutorial for running flows in Kubernetes (https://orion-docs.prefect.io/tutorials/kubernetes-flow-runner/) and I keep running into the same problem. When I run the deployment,
    prefect deployment run test-flow/test-deployment
    , the UI shows that the flow run is Pending. When I look at the logs in Kubernetes with
    kubectl logs -l job-name=voracious-antelope2b89g
    , I see a FileNotFound stack trace Thank you for your help!
    k
    m
    • 3
    • 5
  • t

    Tomer Cagan

    03/13/2022, 12:21 PM
    Hello, Trying to get a performance report based on the information executors documentation. I am getting an error - see in thread
    a
    k
    • 3
    • 9
  • d

    Dekel R

    03/13/2022, 4:50 PM
    Hey, I’m trying to run a flow with LocalDaskExecutor - this is my code (simplified - I have multiple tasks after those 2)-
    CREDENTIALS_SECRET_NAME = 'PREFECT_SERVICE_ACCOUNT_CREDENTIALS'
    
    
    with Flow('comparable-products-data-extraction-development',
              storage=Docker(registry_url="us-central1-docker.pkg.dev/xxx/",
                             dockerfile="./Dockerfile"), executor=LocalDaskExecutor(scheduler="processes")) as flow:  # , schedule=daily_schedule
        raw_credentials = PrefectSecret(CREDENTIALS_SECRET_NAME)
        # raw_credentials = get_local_credentials()
        google_credentials = parse_credentials(credentials=raw_credentials)
    And here is the parse_credentials task -
    from prefect import task
    from google.oauth2 import service_account
    
    
    @task()
    def parse_credentials(credentials):
        return service_account.Credentials.from_service_account_info(credentials)
    This works fine in multiple flows when not using Dask executor - and it also runs without any problem locally when using Dask…. When running this in Prefect cloud - (PREFECT_SERVICE_ACCOUNT_CREDENTIALS is saved as a json secret, and it works in other flows) I get this Dask error - (see at the my first comment) Any solution for this? what am I missing? Thanks
    a
    • 2
    • 5
  • r

    Raymond Yu

    03/14/2022, 2:23 AM
    Heya Prefectionists, I’ve built something of a dumb s3 sensor, but it appears to get stuck whenever it attempts to retry. Any suggestions or ideas as to why it could hang on retry? I’ll include a snippet of the code in the comments, all the upstream tasks have result types in PrefectResult().
    k
    a
    • 3
    • 8
  • a

    Ayah Safeen

    03/14/2022, 6:42 AM
    Hi all, I'm using prefect core, and when I
    run prefect server start
    I get the following error
    Pulling postgres ... error
    Pulling hasura   ... error
    Pulling graphql  ... error
    Pulling apollo   ... error
    Pulling towel    ... error
    Pulling ui       ...
    a
    • 2
    • 2
  • r

    richard-social 812

    03/14/2022, 8:35 AM
    Hello, Is this the correct way to ensure a task only runs once a week, in a flow scheduled to run daily?
    @task(result=LocalResult(dir=f'{os.getcwd()}/storage/pre_delinquency_models',
                             serializer=H2OModelSerializer()),
          checkpoint=True, target='{date:%Y}-Week {date:%U}', log_stdout=True)
    def train_new_model(data: pd.DataFrame):
    I have it running on prefect cloud with a run_config that looks like below:
    run_config = LocalRun(env={'PREFECT__FLOWS__CHECKPOINTING':os.environ.get('PREFECT__FLOWS__CHECKPOINTING', 'true')})
    However, looking at the results folder I see that the task result file is created a new with each daily run . What am I missing?
    e
    a
    • 3
    • 4
  • d

    Dekel R

    03/14/2022, 9:21 AM
    Hey, I have a flow running using Vertex Run - this is my run config -
    flow.run_config = VertexRun(scheduling={'timeout': '3600s'},
                                machine_type='n2-highcpu-80', labels=["ml"],
                                service_account=PREFECT_SERVICRE_ACCOUNT)
    It works without the scheduling parameter - I added it and used this documentation - https://docs.prefect.io/orchestration/flow_config/run_configs.html#vertexrun Now when registering to Prefect cloud and running I get this error -
    Parameter to MergeFrom() must be instance of same class: expected google.protobuf.Duration got str.
    Am I missing something? Thanks
    a
    m
    • 3
    • 3
  • e

    Emma Rizzi

    03/14/2022, 9:54 AM
    Hello! I have a flow that uses DateTimeParameters, I would like to schedule it with one date being default to the day of the schedule, or first day of the month for example, is there a way to do it through schedules or should I handle this in my code ?
    s
    a
    • 3
    • 3
  • p

    Patrick.H

    03/14/2022, 1:08 PM
    hello
    👋 4
  • h

    Hugo Polloli

    03/14/2022, 1:39 PM
    Hi, I have chained tasks that are simply database operations, they don't return anything but they have to be run in a specific order. To this day I've simply used a boolean, I
    return True
    and pass it, unused, to the next task. Is that ok or did I miss a way to do that more "beautifully" in the docs ? (I don't find it particularly bad, + with the fact that I suffix it with "_done" I think it's explicit, but was still wondering)
    t1_done = task1()
    t2_done = task2(t1_done)
    ....
    s
    t
    • 3
    • 3
  • c

    Chris Reuter

    03/14/2022, 2:12 PM
    Hi all! It's our first ever launch week and to celebrate, in case you don't following the #announcements channel....we're giving away free pizza starting at 12p Eastern! https://prefect-community.slack.com/archives/CKNSX5WG3/p1647264267554719
    💙 1
    🎉 1
    🍕 8
    :marvin: 5
  • n

    Nico Neumann

    03/14/2022, 2:23 PM
    Hi, Is it possible to have a callback function which is called every time a flow changes its state which runs on the system where I kick off the flow rather than on the system actually running the flow? I tried
    Flow(..., state_handlers=[…])
    but the callback is called on the system where the flow is executed. My idea is to use
    StartFlowRun
    e.g. on my local computer which starts the flow in AWS cloud. And every time the state changes (
    submitted
    ,
    running
    ,
    canceled
    , etc.) a local callback function is called where I see the
    state
    and
    flow_run_id
    .
    k
    a
    • 3
    • 3
  • t

    Tyndyll

    03/14/2022, 2:29 PM
    Hi, I've what is probably a fairly basic question, but I can't find the right sequence of words to get an answer in the docs. Say I have a flow that takes 23 hours to run, and I kick it off at Midnight each night. Is it possible to block today's run if yesterdays is still running?
    e
    k
    a
    • 4
    • 5
  • c

    Chris Reuter

    03/14/2022, 4:59 PM
    Free pizza is live! Get it while it's hot, just reply with your favorite open source project 🍕 https://twitter.com/PrefectIO/status/1503397645674504192
    :marvin: 1
    :upvote: 1
  • a

    Anna Geller

    03/14/2022, 5:19 PM
    Apart from providing Pizza, we've also just launched this beautiful Community website http://prefect.community By far, the best part of it is the page featuring Top Prefectionists: https://www.prefect.io/community/top-contributors/ Kudos to all contributors! Thanks for being part of our community 🚀
    ❤️ 8
  • c

    Chris Reuter

    03/14/2022, 8:20 PM
    FYI - two live events for Launch Week! Will we see you there? https://prefect-community.slack.com/archives/C036FRC4KMW/p1647289203959699
  • e

    etem citil

    03/14/2022, 8:38 PM
    Hello everyone, how do you manage security requirements in Prefect core opensource version such as UI/API encrypted acces, authn/z etc..? is there any workaround applied by anyone? It seems cloud version solves all these problems. is there any way to handle these security requirements using in on prem?
    k
    • 2
    • 1
  • a

    Apoorva Desai

    03/14/2022, 9:40 PM
    Hello, I am trying to add slack notifications for a prefect flow using this: https://docs.prefect.io/core/advanced_tutorials/slack-notifications.html#installation-instructions I have a slack webhook URL that I've saved as a prefect secret. I am importing this:
    from prefect.utilities.notifications import slack_notifier
    and my tasks now look like this :
    task(log_stdout=True, state_handlers=[slack_notifier])
    
    install_snowflake_task = ShellTask(helper_script="pip install boto3 \
        snowflake-connector-python[pandas] \
        snowflake-ingest && pip install PyJWT==1.7.1", shell="bash", stream_output=True, return_all=True, state_handlers=[slack_notifier])
    
    install_dbt_task = ShellTask(helper_script="pip install dbt==0.18.0", \
                                 shell="bash", stream_output=True, return_all=True, state_handlers=[slack_notifier])
    My flow looks like
    with Flow("name-of-flow", state_handlers=[slack_notifier]) as flow:
    The flow runs successfully but I see no notifications on the slack channel that I have authorized for this. What am I doing wrong?
    a
    k
    l
    • 4
    • 38
  • w

    Wieger Opmeer

    03/14/2022, 10:54 PM
    Quick Orion question: is it possible to set the name of a task at run-time of a flow?
    m
    • 2
    • 5
  • s

    Stevon Shakey Eugene Crowder, Jr.

    03/15/2022, 4:01 AM
    💃 Just arrived!
    👋 4
    k
    a
    • 3
    • 2
  • k

    Kevin Otte

    03/15/2022, 4:03 AM
    question: Let's say I have 2 separate repos where prefect lives...can I post a 'wait for done" flag type object (similar to airflow) that sends a signal to the 2nd Flow that it can be started?
    k
    a
    • 3
    • 3
  • s

    Scarlett King

    03/15/2022, 12:55 PM
    Hi guys, I have a question regarding Prefect result. When the result is uploaded to Azure Blob Storage, does any encryption happen between transit? If yes, which encryption standard is that?
    k
    • 2
    • 2
  • m

    Muddassir Shaikh

    03/15/2022, 12:56 PM
    Can we create a task which runs before the flow ends, when all upstream task are successful. (without mentioning all upstream tasks name)
    k
    • 2
    • 4
  • j

    Jacob Wilson

    03/15/2022, 4:31 PM
    Hello Community, I am running into a snag and I was wondering if anyone had any insight on what is going on. It appears my flow is getting stuck in the
    Submitted for execution:
    phase. I am using
    Docker
    storage (The image is hosted in ECR and I’ve confirmed my execution role has access to the repo) and
    ECS Run
    . The flow runs locally but not in ECS. Dockerfile:
    FROM prefecthq/prefect:latest-python3.8
    WORKDIR /opt/prefect
    COPY requirements.txt .
    RUN pip install --upgrade pip && \
        pip install --no-cache-dir -r requirements.txt
    Flow code:
    import os
    from prefect import Flow, task
    from prefect.run_configs import ECSRun
    from prefect.storage import Docker
    
    
    @task(log_stdout=True)
    def extract():
        x = [4, 5, 6]
        print("Starting: {}".format(x))
        return x
    
    
    @task
    def transform(y):
        return [i * 10 for i in y]
    
    
    @task(log_stdout=True)
    def load(z):
        print("Received: {}".format(z))
    
    
    with Flow("Test Flow", storage=Docker(dockerfile="Dockerfile", registry_url=os.getenv("REGISTRY_URL"), image_name=os.getenv("IMAGE_NAME"))) as flow:
        e = extract()
        t = transform(e)
        l = load(t)
    
    flow.run_config = ECSRun(
        task_role_arn=os.getenv("TASK_ROLE_ARN"),
        execution_role_arn=os.getenv("EXECUTION_ROLE_ARN")
    )
    k
    • 2
    • 2
  • b

    Bradley Hurley

    03/15/2022, 4:55 PM
    Hi Folks - I have a flow that uses the Prefect provided
    RenameFlowRun
    . Would it be expected that in a downstream task if I call
    prefect.context.get("flow_run_name")
    the new/updated name is returned?
    k
    • 2
    • 9
  • x

    Xavier Babu

    03/15/2022, 5:30 PM
    How to call Prefect Python APIs within Prefect Orion? Also, how to call Prefect Orion Python APIs from a Java application? Do we have any tutorial or doc for both cases?
    m
    • 2
    • 6
  • s

    Shaoyi Zhang

    03/15/2022, 5:33 PM
    Hi folks, is Prefect SOC 2 compliant?
    k
    • 2
    • 2
  • a

    Adam Roderick

    03/15/2022, 5:36 PM
    Hey all! I am working on more complex flow tasks that require external libraries like pandas and scikit-learn. I've had trouble deploying before due to pickling not supporting all libraries. Do you have any general guidance, or a list of "known pickleable" libraries?
    k
    • 2
    • 4
  • s

    Sarah Floris

    03/15/2022, 5:53 PM
    Is there an easy way to get my secrets from local .env file into Prefect Cloud?
    k
    • 2
    • 3
Powered by Linen
Title
s

Sarah Floris

03/15/2022, 5:53 PM
Is there an easy way to get my secrets from local .env file into Prefect Cloud?
k

Kevin Kho

03/15/2022, 5:54 PM
Probably the easiest is loading it in and using this:
from prefect import Client

client = Client()
client.set_secret(name="MYSECRET", value="MY SECRET VALUE")
s

Sarah Floris

03/15/2022, 8:20 PM
And I assume I have to do that every time I register my flow with new updates?
k

Kevin Kho

03/15/2022, 8:22 PM
I don’t see why if the secrets stay the same?
View count: 7