https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • p

    Patrick Tan

    06/06/2022, 8:01 PM
    Hi, When I run flow from command line or UI, I can specify log level, Can I specify log level (eg DEBUG) from code like below?
    create_flow_run.map(
        flow_name=unmapped("LiveLots-ETL"),
        project_name=unmapped(f"{prefect_project}"),
        run_name=flowname_list,
        run_config=unmapped(LocalRun(labels=[prefect_label])),
        idempotency_key=idempotency_keys,
        parameters=filelist,
    )
    k
    • 2
    • 6
  • m

    Matt Alhonte

    06/06/2022, 11:01 PM
    What's a good way to try multiple model types with the meta-estimators (like
    RandomizedSearchCV
    )in
    dask-ml
    ?
    map
    ?
    k
    • 2
    • 3
  • s

    Sandip Viradiya

    06/06/2022, 11:43 PM
    Hello fellas, I’m looking for a solution that can change a Prefect task state from outside, like using a post request from Postman to change a running Prefect task state from running to Success, do you know there could be a good option for this, thank you.
    ✅ 1
    k
    a
    • 3
    • 8
  • d

    Dileep Damodaran

    06/07/2022, 3:53 AM
    Hi all, Is there any reason why a task runs regardless of the case result? I want to run a task only when a parameter is set to True. Implemented this using the Case control flow, but it runs irrespective of the case result. That is even when the parameter is set to False.
    k
    • 2
    • 9
  • s

    Stephen Lloyd

    06/07/2022, 5:14 AM
    Hi. I’m running on ECS with local dask executor. If I understand my config correctly, I’m running with 512 cpu and 1024 memory I have a process that does a fairly big mapping (could be 1000s or 100ks of api calls). I’d like to run with as many workers as possible. I am using 16 workers with default scheduler. This gets me about 16 tasks per second on my laptop, but ECS performance seems to be more like 4-5 tasks per second. It seems like
    DaskExecutor
    might be an option, but I don’t quite understand all the setup I would need to do.
    a
    • 2
    • 6
  • a

    Antti Tupamäki

    06/07/2022, 8:57 AM
    Hi how https://docs.prefect.io/api/latest/tasks/aws.html actual works? Now our ci/cd pipeline failed when it tried register flows and it did not have proper access to s3 bucket during registration. But then I refactored code to use task which uses boto and ci/cd pipeline passed.
    a
    • 2
    • 1
  • s

    Sang Young Noh

    06/07/2022, 11:05 AM
    Hello. I’m currently trying to delete some deployments and redeploy onto the prefect cloud, but I’m getting the following error:
    Client error '401 Unauthorized' for url '<https://api-beta.prefect.io/api/accounts/5c1f7ebd-7031-4f69-bc00-6d21a9b17548/workspaces/9a321454-aa9e-4fb8-8f4c-17e327a77a01/blocks/get_default_storage_block>'
    For reference, I was trying to run some deployments on prefect cloud, and when that didnt work, I logged out of prefect cloud and activated a local orion server instead. The deployment command I am running is:
    prefect orion database reset && prefect deployment create manager.py
    Any help on this would be much appreciated!
    ✅ 1
    a
    • 2
    • 45
  • i

    Ilya Sapunov

    06/07/2022, 11:45 AM
    Hi I want to create multiple task instances with different credentials to get data from several accounts
    with Flow("import") as flow:
        for i in range(5):
            res = import_task(
                command="echo $CREDENTIALS",
                env={
                    "CREDENTIALS": str(i)
                },
            )
    But in that case all tasks will run in parallel, which will cause the request limits to be exceeded Can I run those tasks sequentially ?
    a
    • 2
    • 1
  • f

    flurven

    06/07/2022, 11:59 AM
    How does 2.0 work with raspberry pi? 1.0 works fine, but I want the new shiny stuff running at home as well. Had some trouble getting Orion to start though.
    ✅ 1
    a
    • 2
    • 11
  • m

    Martin T

    06/07/2022, 1:48 PM
    What is the proper way to modify one Parameter() based on another Parameter()? This does not work:
    from prefect import Flow, Parameter, case, task
    
    @task
    def mytask(p2):
        print(p2)
    
    with Flow("case-demo") as flow:
        p1 = Parameter("p1", default=False)
        p2 = Parameter("p2", default="good")
        with case(p1, True):
            p2 = "bad"
        mytask(p2)
    
    print(flow.tasks)
    $ prefect build -p flow.py
    UserWarning: Tasks were created but not added to the flow: {<Parameter: p1>, <Parameter: p2>}.
    $ prefect run -p flow.py
    ...bad...
    ✅ 1
    a
    k
    • 3
    • 4
  • m

    Marcin Grzybowski

    06/07/2022, 1:51 PM
    hi I'm trying to run prefect2.0 in docker containers that I build manually when I run flows in a container, they seem to work I mount orion.db from host in docker run
    -v /home/marcin/.prefect/orion.db:/root/.prefect/orion.db
    when I run orion dashboards from host:
    prefect orion start
    it works as intended - shows flows run from container. But when I try to host dashboards from container I get empty dashboards (also mounting oriond.db, so both containers see the same database
    -v /home/marcin/.prefect/orion.db:/root/.prefect/orion.db
    added -p 4200:4200 to map port fdrom host to container, and exported PREFECT_ORION_API_HOST=0.0.0.0 so I can access dashboards in container from my host. orion.db is properly mounted in both containers, because I see that it changes in them when i run a flow. Dashboards are accessible, but empty...
    ✅ 1
    k
    a
    • 3
    • 15
  • f

    Florian Guily

    06/07/2022, 2:22 PM
    Hi everyone, is there a sort of caching process with prefect secret ? I just changed one and my workflows are still getting the older value of the secret.
    k
    • 2
    • 3
  • j

    John O'Farrell

    06/07/2022, 2:40 PM
    Hey, I'm trying to configure an ECSRun using a task definition. However when I try running it, it throws
    An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Container.image should not be null or empty.
    even though I know that the image URI that I'm using within
    containerDefinitions
    has content.
    k
    • 2
    • 4
  • d

    Dekel R

    06/07/2022, 5:57 PM
    Hey, I have a flow that generates a big pandas DataFrame (about 19M rows and 13 cols) and then saves it to Google BigQuery/Google storage.. etc (There are multiple tasks - but for this question I’m gonna simplify the flow). Task_a returns a df - I log the shape of it (at the last line of task_a) and get the 19M and 13 cols df - then this output gets sent to the next task - Task_b - I log the type and shape in the first row of this task - and its a Nonetype. Anyone ever had such an issue? (I debugged it locally and I cant reproduce the issue - I only get it once processing a lot of data - maybe its the issue here?) Example -
    @task()
    def task_a():
       df = pd.Dataframe()
       ......
       <http://logger.info|logger.info>(df.shape) # 19M and 13 cols
       return df
    
    @task()
    def task_b(df):
       <http://logger.info|logger.info>(type(df)) # Nonetype
    
    with Flow('****',
              storage=Docker(registry_url="us-central1-docker.pkg.dev****/",
                             dockerfile="./Dockerfile"), executor=LocalDaskExecutor(scheduler="processes")) as flow: #
          df = task_a()
          task_b(df)
    k
    • 2
    • 15
  • m

    Malthe Karbo

    06/07/2022, 6:13 PM
    Hi! Is there any news on orion beta 6 release?
    m
    k
    • 3
    • 7
  • j

    Jason Motley

    06/07/2022, 7:54 PM
    Can I bucket tasks together for the purposes of parallel execution with a Dask Executor? For example, Tasks 1-9 are dependent (3 depends on 2, 2 depends on 1) while Task 10 can be run simultaneous with the entire group of tasks 1-9.
    k
    • 2
    • 1
  • r

    Ron Budnar

    06/07/2022, 9:31 PM
    Hi! I have a question about pulling secrets when a flow is executing. I've followed this guide for setting up a Prefect/DBT flow and want to be able to grab fresh database credentials at runtime without having to reregister the flow. Following the guide, I've got basically this:
    @task
    def get_dbt_kwargs(**kwargs):
        return kwargs
    
    
    with Flow(name="dbt_command_flow", run_config=docker_run, storage=STORAGE) as flow:
        dbt_repo = Parameter(dbt_repo_url", ...)
    
        dbt_env = get_dbt_kwargs(
            DBT_ENV_SECRET__SQL_SERVER=PrefectSecret("DBT_ENV_SECRET__SQL_SERVER"),
            DBT_ENV_SECRET__SQL_DATABASE=PrefectSecret("DBT_ENV_SECRET__SQL_DATABASE"),
            DBT_ENV_SECRET__SQL_USER=PrefectSecret("DBT_ENV_SECRET__SQL_USER"),
            DBT_ENV_SECRET__SQL_PASSWORD=PrefectSecret("DBT_ENV_SECRET__SQL_PASSWORD"),
        )
       ... # a couple of tasks omitted for brevity
       dbt_run = dbt(
            env=dbt_env,
            command=dbt_command,
            task_args={"name": "Run DBT Shell command"},
            upstream_tasks=[pull_task, dbt_env],
        )
    My question is - is there a better/preferred way to do the above? Basically, I'm trying to ensure that if credentials/secrets are changed in Prefect Cloud, then the next run picks them up correctly.
    k
    k
    • 3
    • 5
  • j

    Jacob Bedard

    06/08/2022, 12:00 AM
    I'm having a hard time recalling this, and I know I've fixed it before. I've got prefect installed in an environment in anaconda, and I'm running the env, but I can't call the Prefect CLI. I keep getting this "prefect: command not found" error, but I don't know where to go find the executable to go make it executable. Can someone point me to it?
    k
    • 2
    • 3
  • a

    Andreas Nigg

    06/08/2022, 6:59 AM
    Hey, maybe a stupid question, but how do you re-apply a deployment withour re-creating it? I'm refering to the update path for the latest prefect release.
    ◦ If you are using an agent running on Kubernetes, update the Prefect image version to
    2.0b6
    in your Kubernetes manifest and re-apply the deployment.
    Thanks for the update by the way - up until now, working with prefect is just awesome - I enjoy every minute of it.
    ❤️ 1
    a
    • 2
    • 4
  • t

    Tarek

    06/08/2022, 7:44 AM
    hi, i managed to run local agent in docker container + my app and services in docker container two..i was trying to put the creation of the tenant
    prefect server create-tenant --name default --slug default
    in the dockerfile of my app, it doesn't work. why do other users have to install prefect locally outside the containers and run the command there. no way to have the command somewhere in the docker images?
    ✅ 1
    m
    a
    • 3
    • 6
  • a

    Andreas

    06/08/2022, 8:51 AM
    HI! I would like to ask if parameters passed to a task in Prefect 2.0 are also validated with pydantic or is this the case only in the flow level. (For example if I have provided pydantic annotations/field types or a pydantic model to a downstream task will prefect still check the data the upstream task passes to the downstream task in runtime?)
    ✅ 1
    a
    m
    • 3
    • 4
  • a

    Arthur Jacquemart

    06/08/2022, 11:26 AM
    Hi Prefect community. I am playing around with the GraphQL API and I do not understand the output. I want to check if an agent has some task "pending" or "running". I managed to do it but I do not understand the answer I get back from the API
    ✅ 1
    a
    • 2
    • 23
  • f

    Frederick Thomas

    06/08/2022, 1:28 PM
    Hi all< We're attempting to use the Docker agent but have run into a problem: the test flow that we created is registered to a local agent. Could you please assist. Thanks!
    ✅ 1
    a
    j
    k
    • 4
    • 10
  • d

    Danilo Drobac

    06/08/2022, 1:32 PM
    I'm trying to run
    prefect cloud login --key <API Key I Just Created>
    but I keep getting an error that says:
    Unable to authenticate. Please ensure your credentials are correct.
    Am I missing something?
    ✅ 1
    a
    • 2
    • 27
  • n

    Nelson Griffiths

    06/08/2022, 1:52 PM
    Is there a plan to add the ability to run flows on GCP Cloud Run with Prefect 2.0 anytime soon?
    ✅ 1
    👍 2
    a
    m
    m
    • 4
    • 12
  • o

    Ollie Sellers

    06/08/2022, 2:01 PM
    Hello, we’re running Prefect 1 and recently upgraded from LocalDaskExecutor to DaskExecutors run on KubeClusters with Azure Kubernetes Service running the agent. We have a number of flows which are then run by a master flow. The master flow runs and can complete but doesn’t seem to be parallelising as expected - for the underlying tasks, the mapped children are running 1 by 1 even though the adapt_kwargs are set to minimum of 1 and maximum of 10 in each DaskExecutor. Is this a problem with running the flows from a master flow which also has a DaskExecutor defined?
    k
    a
    t
    • 4
    • 30
  • a

    Alexandru Anghel

    06/08/2022, 2:11 PM
    Hello again, Thanks for the help so far, i've been able to build the foundations for orchestrating some pipelines. However, i'm coming back with a question. I have a flow containing two tasks: the first one is creating a bigquery table and the second one is calling an API at seconds interval (while loop with sleep) and inserting the payload into the bigquery table. The problem i encounter is when i try to register the flow using
    flow.register(project_name='my-project')
    inside the python code. The while loop from the second task is preventing the register and it is unreachable. I want to register and run the flow using
    python file.py
    More code inside the thread. Is there a way to reach the register command of the flow inside the python code? Thank you!
    k
    • 2
    • 5
  • i

    Ilhom Hayot o'g'li

    06/08/2022, 2:20 PM
    Hi all! I am trying to retreive time taken for @task and Flow( ) How to retreive/monitor start and end times of flow in the code. Is there any built in functions?
    k
    n
    • 3
    • 6
  • t

    Tim Helfensdörfer

    06/08/2022, 2:52 PM
    In the REST API docs for version 2, is there a chapter about authentication I am missing? https://orion-docs.prefect.io/api-ref/rest-api/
    ✅ 1
    a
    • 2
    • 9
  • t

    Tim Enders

    06/08/2022, 3:15 PM
    Looking back at Prefect 2.0 and trying to get my old testing (from alpha days) spun back up. I am getting this error I think with regards to registering my flow?
    sqlite3.OperationalError: table flow already exists
    ✅ 1
    a
    f
    • 3
    • 12
Powered by Linen
Title
t

Tim Enders

06/08/2022, 3:15 PM
Looking back at Prefect 2.0 and trying to get my old testing (from alpha days) spun back up. I am getting this error I think with regards to registering my flow?
sqlite3.OperationalError: table flow already exists
✅ 1
I get it trying to run the flow AND trying to start the UI server. So it might also just be a database issue.
a

Anna Geller

06/08/2022, 3:20 PM
You can remove the DB and uninstall Prefect, it would be the easiest
t

Tim Enders

06/08/2022, 3:21 PM
Seems like i just need to nuke the SQLLite DB, but I can't find it... where would it be found?
a

Anna Geller

06/08/2022, 3:21 PM
Then of course install the latest version:)
In home .prefect
t

Tim Enders

06/08/2022, 3:21 PM
I am in a new Venv, so it has installed the latest version, but I think it is trying to recreate the DB over top. Thank you!
Follow up question on this DB when using the cloud. Will there still need to be a local DB running?
a

Anna Geller

06/08/2022, 4:11 PM
No - just install Orion, login to Cloud 2.0 with your API key and you should be good to go without worrying about the database
f

flurven

06/08/2022, 6:35 PM
I got the same issue on the raspberry pi. Removed the db, renamed the table flow, tried a few things. Then I got another error instead...
a

Anna Geller

06/08/2022, 9:09 PM
it looks like it's resolved now for both of you?
t

Tim Enders

06/08/2022, 9:10 PM
Yeah, I deleted my SQLite DB and it recreated it no problem.... getting my flow ported before I try to hook up to Cloud 2.0
👍 1
a

Anna Geller

06/08/2022, 9:45 PM
nice 🙌
View count: 4