https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    tas

    07/18/2022, 3:01 PM
    Hi there! Is there a documentation on how to setup prefect server for dask cluster that is running on the same machine but in docker containers?(So scheduler is in a container and its workers are also in separate containers)? I've created that cluster using our codebase, but then whenever running the flow from Prefect UI, the task goes to dask scheduler, then worker, but then it fails with "requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url". during initialising the run (it fails with that on dask worker). The task runs when I run it locally with flow.run(). Is there a config var that would point it to outside of dask container?
    a
    • 2
    • 7
  • t

    Tarek

    07/18/2022, 3:02 PM
    Hello Prefect folks! For my flow I am using
    KubernetesRun
    and
    Docker
    registry for storage. In our Gitlab CI/CD pipeline, we want to register the flow when the feature branch is merged into main. However one problem we’re facing is that our CICD pipeline runs on docker, and since Prefect builds the docker image in the registration step, our pipeline fails (docker in docker). I’ve been looking for workaround around this and would be grateful for any ideas: I can tell you about what I have tried so far in the comments
    a
    k
    • 3
    • 4
  • p

    Pranit

    07/18/2022, 3:18 PM
    I installed prefect and was able to make cloud run jobs on my local few days ago. But today I am not able to get prefect server started. It says
    -sh: prefect: command not found
    Am I missing some step?
    k
    • 2
    • 6
  • f

    Farooque Shaikh

    07/18/2022, 3:37 PM
    I am trying to run a graphql query in Python using the below code: params={ "date_obj":"2022-07-18T01:32:26.584864+00:00", "curr_obj":"2022-07-18T05:32:26.584864+00:00", } query=client.graphql( { 'query($date_obj: timestamptz=': { 'flow_run(where: {start_time: {_gte: $date_obj}})': { 'flow_id' } } however the parameter value for date_obj is not getting passed. Can someone please help on this
    k
    • 2
    • 7
  • r

    rectalogic

    07/18/2022, 5:49 PM
    I setup MFA for my account, but it never prompts for the token when I login. Any idea why?
    k
    j
    • 3
    • 7
  • s

    Scott Aefsky

    07/18/2022, 6:20 PM
    Hi all. I'm trying to create flow runs using the GraphQL API (Prefect 1.0), and am having trouble getting the syntax right to pass a parameter to my flow. Any help you could provide would be greatly appreciated. Details in thread.
    k
    • 2
    • 10
  • j

    Josh Paulin

    07/18/2022, 7:50 PM
    Hello. Getting an error when trying to create a new project with the same name as one I just deleted
    Traceback (most recent call last):
      File "/Users/joshuapa/.pyenv/versions/3.9.9/lib/python3.9/site-packages/prefect/client/client.py", line 1055, in create_project
        res = self.graphql(
      File "/Users/joshuapa/.pyenv/versions/3.9.9/lib/python3.9/site-packages/prefect/client/client.py", line 464, in graphql
        raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'path': ['create_project'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    k
    • 2
    • 12
  • s

    Shaoyi Zhang

    07/18/2022, 8:35 PM
    Hi, we are using Prefect Cloud 1.0. One member on our team is having trouble accepting invitations to a tenant. He already has access to tenant B, but when he tries to accept the invitation for tenant A, he got
    401 unauthorized
    error. This error applies to both the email invitation and the invitation in Prefect Cloud UI. A different team member was able to accept invitation to tenant A recently.
    ✅ 1
    m
    • 2
    • 1
  • c

    Christian Nuss

    07/18/2022, 8:57 PM
    question: is doing
    .run(...)
    on a task within a task an antipattern?
    k
    • 2
    • 5
  • e

    Emerson Franks

    07/18/2022, 11:39 PM
    Hopefully an easy one. I have a flow that needs to run 3 functions/tasks in order and I can't seem to find the correct syntax to make them run sequentially. Currently, I have simple code that looks like this:
    k
    • 2
    • 22
  • n

    Nikolaus Landgraf

    07/19/2022, 7:15 AM
    Good morning! Since the upgrade to
    2.0b8
    we have experienced 2 things: • performance of single flows became around 2x slower (we are using the Sequential task runner) and have a lot of database operations in our tasks • Some Flows get interrupted by a 403 forbidden error and do not pick up from where they were left. Is there a way of increasing the lifetime of the token?
    a
    m
    • 3
    • 15
  • m

    Mike Geeves

    07/19/2022, 7:57 AM
    Apologies in advance for quite a vague question 🙂 I've historically custom rolled data pipelines (janky shell/python scripts, slightly nicer but terrible observability with Camel). Is there much usage, or nice ways for dealing with time series and spatial data in Prefect? I can see for example being able to look at success/fail rates of the run history, I'm wondering how this could work when sometimes you only "care" about a failure for a short amount of time and that's subject to when it last failed. Use case: satellite images are processed for various areas - there could be one job for each tile of interest. Periodically each tile is checked for new data and when there is some, a number of steps to load and process are performed. Sometimes there isn't data, so the first step fails. This can be because it isn't available yet, or it won't ever be available. Failed might be fine because there's nothing we can do anyway (there's just no data). However if the same one is failing for say a couple of weeks and there's no "recent" data then it becomes bad. Sometimes bad things happen and everything fails. Maybe an API key has been revoked or a service is down. Failed is bad. For my uses I ended up tracking these and making a dashboard showing e.g. each day along an x axis and each tile along a y axis to be able to spot gaps which hopefully might go from red to green after retries. Is something like that possible via Prefect? To pull out and create custom dashboards like that or via API calls etc? This is purely an "out of interest" rather than an immediate need, observability and even being able to categorise failures was a huge problem so I'm just wondering if this is something catered for, so if there are solutions that would be great to here, but if not "nope, you're still on your own" is fine 😄
    s
    • 2
    • 4
  • v

    Vlad Tudor

    07/19/2022, 10:11 AM
    [SOLVED] Hello, a newbie question I am getting started with prefect, when I run `
    prefect server start
    I get this error:
    OSError: [Errno 8] Exec format error: 'docker-compose'
    I suspect it might be a
    docker-compose
    version issue, since the last version is called by running
    docker compose
    (without
    -
    ) Should I downgrade? (Cuurently Docker Compose version v2.6.0)
    ✅ 1
    • 1
    • 1
  • x

    xyzz

    07/19/2022, 10:22 AM
    is there a specific reason https://github.com/PrefectHQ/prefect/pull/5729 wasn't merged yet? It only changes a few lines of code and helps anyone who has only access to custom S3 endpoints.
    a
    • 2
    • 4
  • r

    Riccardo Tesselli

    07/19/2022, 1:16 PM
    hello, in Prefect 2.0 I’ve defined a new custom block as described in https://orion-docs.prefect.io/concepts/blocks/#creating-blocks. Once I’ve defined the model, is there a way to upload the definition of the block in Prefect cloud without having to create a block instance from code and the save it? I mean, I want to be able to see in the UI the new custom block definition without having to define a dummy instance of it
    a
    a
    • 3
    • 5
  • m

    Matthew Seligson

    07/19/2022, 2:48 PM
    I have a flow that runs every month. I’d like the flow run name to always include the name of the month like “My Flow - July”. What’s the best way to do this?
    k
    • 2
    • 4
  • y

    Yiwei Hou

    07/19/2022, 3:06 PM
    hello!! I noticed that Prefect UI will print out the ECS task id in the log output while the task starts running, I was wondering is there any way to directly retrieve it?
    k
    • 2
    • 6
  • m

    Michael Reynolds

    07/19/2022, 3:43 PM
    is there an open source ticket to track when the
    checkpoint = False
    flag will be introduced into
    orion
    / prefect 2.0?
    k
    m
    • 3
    • 4
  • a

    Alvaro Durán Tovar

    07/19/2022, 3:47 PM
    hi! it's failing to create a project from cli and from code
    prefect.exceptions.ClientError: [{'path': ['create_project'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    k
    • 2
    • 13
  • m

    Michelle Brochmann

    07/19/2022, 4:50 PM
    From inside a task, how can I get the start time of the current flow run?
    k
    • 2
    • 1
  • m

    Michael Reynolds

    07/19/2022, 5:32 PM
    i have a component that is extremely costly to instantiate that needs to be used in multiple tasks. is there a way to distribute an instance of this as a singleton so that i do not need to reinitialize the costly processor each time the task is run? i'll provide a basic model of the python code demonstrating my dilemma
    k
    • 2
    • 19
  • a

    Amogh Kulkarni

    07/19/2022, 5:48 PM
    @Anna Geller - We are seeing one issue with our flow today. We have scheduled our flow every 5 minutes. We use KubernetesRun as our run config and S3 as our storage. We have scheduled the flow from the Prefect Cloud. The issue that we are seeing is that if the flow run’s run time exceeds 5 minutes, a second flow run of the flow runs in parallel. Today because of upstream long running jobs, our first flow run was running for 15 minutes and in the meanwhile there were 3 flow runs of the same flow that were running. Is there any way to ensure that a new flow run runs only when the previous flow run completes?
    ✅ 1
    k
    • 2
    • 4
  • c

    Chu

    07/19/2022, 7:35 PM
    Hi community, how is the best practice for implementing parallel dbt jobs using Prefect 1.0? Basically, I need to send different client_id to dbt jobs and trigger dbt run for each client_id (pseudo code as following)
    with Flow as flow:
      for i in id_list:
        dbt_run_function(i)
    (I’m wondering if a simple for loop would achieve parallelism?)
    k
    • 2
    • 9
  • s

    Sebastián Montoya Tapia

    07/19/2022, 8:38 PM
    Hi community! I am trying to use a monorepo where each folder represents a flow, currently I am using ECS Agent to run the flow on AWS Fargate. I am a little bit lost on the errors I am currently having. I am trying to build a docker image for each folder since it has a flow.py file and multiple others like extract.py, load.py, etc ... that are imported from flow.py and then registering the flow, is this process correct and if not which would be a better solution ?
    ✅ 1
    a
    k
    • 3
    • 2
  • s

    Seth Goodman

    07/19/2022, 9:32 PM
    Hi All - is there a best practice when it comes to parallelization within tasks (I'm using local Dask cluster for reference)? My initial flow used task mapping but there were tens of thousands of mapped items which quickly burn through the free tier limit. My current thought is to just test basic parallelization (e.g., Python's multiprocessing) within a task but I worry that it will interfere with Dask's use of resources Thanks in advance for any suggestions
    k
    • 2
    • 4
  • i

    Ilya Galperin

    07/19/2022, 9:50 PM
    Hi all! I’m doing some testing with the DaskExecutor and spinning up an ephemeral Dask cluster on ECS Fargate. Although it seems to work, I ran into the same issue that was described here. I understand that the clean spin-down issue might not be super high priority since the workers seem to execute what is assigned to them but I did have a question about logging. Specifically, my logs look more like this (Beginning Flow Run, then Flow Run SUCCESS, then the RuntimeError) with none of the intermediary logging being captured from the
    list_sum
    function in the flow. Is there a configuration I might be missing that enables this, or has there been any progress in getting the output of DaskExecutor logs into Prefect Cloud?
    ✅ 1
    a
    • 2
    • 3
  • m

    Matthew Seligson

    07/19/2022, 10:14 PM
    I don’t see the paused or skipped states in the Orion docs. This seems like a loss in functionality. What is replacing these?
    ✅ 1
    a
    • 2
    • 3
  • m

    Maikel Penz

    07/20/2022, 2:22 AM
    Is there an intention/movement on putting a Terraform module for Prefect in place? Our Prefect Automations have been manually created and I’d like to have this in source control/managed through a CICD pipeline. Last resort would be to use GraphQL but Terraform would play better with our current stack.
    ✅ 1
    a
    o
    • 3
    • 6
  • e

    Emma Rizzi

    07/20/2022, 7:22 AM
    Hello! Following the lastest annoncement about the pip install command, I need some clarifiction 🧐 I'm still using prefect 1.0, not ready yet for migration, do I just need to be carefull about the prefect version installed in my environment to register the flow, or is there other modifications to do ? thanks!
    m
    a
    • 3
    • 5
  • r

    Riccardo Tesselli

    07/20/2022, 8:08 AM
    In Prefect 2.0, I’m trying to create a Deployment which takes arguments from a configuration stored in Prefect Cloud as a custom block. Here is the sample code:
    config = CustomConfig.load('my_block')
    
    Deployment(
        name="My deployment",
        flow=my_flow,
        parameters={
            "password": config.password
        }
    )
    when I run this command from CLI
    prefect deployment create my_deployment.py
    I get this error
    AttributeError: 'coroutine' object has no attribute 'password'
    
    Failed to load deployments from 'my_deployment.py'
    sys:1: RuntimeWarning: coroutine 'Block.load' was never awaited
    How can I do that?
    a
    • 2
    • 8
Powered by Linen
Title
r

Riccardo Tesselli

07/20/2022, 8:08 AM
In Prefect 2.0, I’m trying to create a Deployment which takes arguments from a configuration stored in Prefect Cloud as a custom block. Here is the sample code:
config = CustomConfig.load('my_block')

Deployment(
    name="My deployment",
    flow=my_flow,
    parameters={
        "password": config.password
    }
)
when I run this command from CLI
prefect deployment create my_deployment.py
I get this error
AttributeError: 'coroutine' object has no attribute 'password'

Failed to load deployments from 'my_deployment.py'
sys:1: RuntimeWarning: coroutine 'Block.load' was never awaited
How can I do that?
a

Anna Geller

07/20/2022, 10:33 AM
Try out the Secret block instead that just got released
check out https://discourse.prefect.io/t/how-to-securely-store-secrets-in-prefect-2-0/1209
r

Riccardo Tesselli

07/20/2022, 11:24 AM
thanks @Anna Geller, but I think my situation is different from the example you provided. I know I can save and load blocks within flows, but at the moment I want to create a deployment as described in https://orion-docs.prefect.io/concepts/deployments/#creating-deployments by using the CLI. I’ve supposed that load would also work outside a flow but it looks like it doesn’t. Now I’m trying to create the deployment directly in Python with
Deployment.create()
, but now I’m facing with this error:
here is the code
if __name__ == '__main__':

    config = CustomConfig.load('my_setup')

    deployment = Deployment(
        name="MyDeployment",
        flow=my_flow,
        parameters={
            "password": config.password,
            "slack_webhook": config.slack_webhook        }
    )
    deployment.create()
and I get this
TypeError: Object of type 'AsyncWebhookClient' is not JSON serializable
so it looks like Pydantic is failing in serializing the Slack Webhook Block
so at the end I’ve fixed by doing this
if __name__ == '__main__':

    config = CustomConfig.load('my_setup')

    deployment = Deployment(
        name="MyDeployment",
        flow=my_flow,
        parameters={
            "password": config.password,
            "slack_webhook": config.slack_webhook.url        }
    )
    deployment.create()
nevertheless, it could be handy to be able to quickly inject a block stored in the cloud into a deployment
a

Anna Geller

07/20/2022, 12:50 PM
we'll have more Deployment recipes after GA release, it should get easier in the next weeks
View count: 8