https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Roger Webb

    08/09/2022, 1:22 PM
    Hey all, Is there any indication that a label applied when a flow is registered, does not get transfered to the instance of the flow started by a schedule. Ie, Flow_A is registered with Label "ABC" (which is clear in the GUI) but when the flow's schedule kicks off.. Label "ABC" is not on that execution so it doesnt actually execute.. it just sits in "Submitted".
    âś… 1
    r
    • 2
    • 7
  • p

    Pedro Machado

    08/09/2022, 2:01 PM
    Hi everyone. I need to copy a set of large files from s3 to AZ storage (blob). It looks like both the s3 and the Azure Blob tasks in the library read the data in memory. I tried rewriting them to stream the data instead. I got it to work but the machine gets unresponsive when transferring large files. This is running in a container on an AWS linux instance (DockerRun). Any suggestions on the best way to stream a file this way without reading it into memory? Thanks!
    r
    • 2
    • 2
  • r

    Roger Webb

    08/09/2022, 2:15 PM
    Hey all, If I have a flow with the following
    Flow_A = create_flow_run(
    flow_name="Flow A",
    project_name="Project A",
    task_args=dict(name="Flow A (Execution)"),
    scheduled_start_time=pendulum.now().add(minutes=60),
    parameters={"Parameter1":"Flow A Parameter 1"}
    )
    Flow_A_Flag = wait_for_flow_run(
    Flow_A,
    raise_final_state=True,
    stream_logs=True
    )
    I would expect the parent flow to kick off the flow run.. and then the wait for to wait for an hour before that wait for would succeed. But, it appears that the Flow A actually executes immediately.. not waiting the hour, so the wait_for succeeds after minutes. Is my misunderstanding in the scheduled_Start_time.. or the wait_for?
    đź‘€ 1
    âś… 1
    b
    • 2
    • 23
  • j

    Jessica Smith

    08/09/2022, 2:48 PM
    Not sure if I'm missing something small or what, but I am getting a FileNotFoundError when using Git Store for my flows. From what I can see, the temporary directory is being deleted before the flow can actually be extracted. I replicated it with the below code. Anyone know what I'm doing wrong?
    clone_url = "XXXXXXX"
    
    with TemporaryGitRepo(clone_url) as temp_repo:
        flow = extract_flow_from_file(
            file_path=os.path.join(
                temp_repo.temp_dir.name,
                "path/to/flow/fil.py",
            ),
            flow_name="flow_name",
        )
    r
    • 2
    • 1
  • p

    Patrick Tan

    08/09/2022, 3:01 PM
    Hi, In 1.0 one of task arguments is state_handlers but it is not in 2.0. What is the new way of invoking state handlers?
    k
    • 2
    • 3
  • d

    datamongus

    08/09/2022, 3:26 PM
    Is it possible to name ShellTasks ? I have a flow that runs 3 ShellTasks, within the UI they all say ShellTask
    a
    • 2
    • 1
  • s

    Sam Garvis

    08/09/2022, 3:28 PM
    $ prefect profile use development_service_account
    â ‹ Connecting...
    Error authenticating with Prefect Cloud using profile 'development_service_account'
    No matter what I do I cannot connect to our Prefect 2.0 Cloud environment from my terminal. When I try to run a flow locally I get
    RuntimeError: Cannot create flow run. Failed to reach API at <https://api.prefect.cloud/api/accounts> ...
    My account_id and workspace_id are correct, I have checked. I don't understand why I cannot connect
    âś… 1
    m
    k
    • 3
    • 15
  • t

    Tarek

    08/09/2022, 4:02 PM
    hello, i updated my application to prefect 2.0.2 now the flow are crashing, even in very sample code..
    k
    m
    +2
    • 5
    • 36
  • t

    Tony Yun

    08/09/2022, 4:08 PM
    Hi, what is a way to pass local
    .prefect/config.toml
    file to runtime environment? For example, I’m writing
    pytest
    in VScode, when I run the test, it always fail for not finding the secrets that specified in that config file. I can only run in CLI
    python flow.py
    fine.
    k
    m
    • 3
    • 11
  • o

    Oscar Björhn

    08/09/2022, 4:29 PM
    Is it not possible to get each deployment's associated id somehow, using the python client library? I can get a list of deployments including metadata such as their name, storage id, deployment id etc, but I don't see the deployment id itself..
    âś… 1
    k
    • 2
    • 4
  • o

    Oscar Björhn

    08/09/2022, 4:49 PM
    Okay, follow-up question. Is there any way to create a subflow by flow name, id, deployment_id or something like that? I've tried using create_flow_run_from_deployment but the flow it creates is not a subflow, so it's not suitable for a flow-of-flows scenario. I'm trying to avoid using direct flow-object references since my flows will be deployed individually, using separate docker containers etc. If it's not possible, that's alright, I'll just have to rethink my strategy in that case. I know I've asked a similar question before but the response to that was "use create_flow_run_from_deployment", which I've finally had time to attempt and it's not what I need. 🙂
    m
    a
    • 3
    • 11
  • m

    Mars

    08/09/2022, 6:42 PM
    Hi all, the Prefect 2.0 Kubernetes agent auto-generated manifest appears to be a lot smaller than the 1.0 manifest. There are far fewer command-line switches and envvar options (just
    PREFECT_API_URL
    ?) Are there additional agent settings in 2.0 I should be aware of for running a production k8s deployment, like the work queue name? If so, do they have envvars, and where in the docs can I read about them?
    a
    • 2
    • 6
  • p

    Patrick Tan

    08/09/2022, 7:42 PM
    Hi, Do you have example of create_flow_run_from_deployment, wait for completion of flow run before continue to next line of code execution in 2.0? I was using create_flow_run and wait_for_flow_run in 1.0
    đź‘€ 1
    b
    m
    • 3
    • 7
  • m

    Matt Delacour

    08/09/2022, 7:55 PM
    How can I query "Service Accounts" from GraphQL? Just want to query which ones are already created or not
    a
    • 2
    • 2
  • s

    Simon Macklin

    08/09/2022, 8:16 PM
    hey Prefect. I am looking to build a prefect v2 client with golang. This would then be used to build a terraform provider or kubernetes controller to create resources using infra as code. Maybe I just missed this in the docs, but how am I able to get a token to authenticate to prefect cloud v2
    k
    • 2
    • 4
  • r

    Ross Teach

    08/09/2022, 8:26 PM
    PrefectV2 appears to prefer throwing an exception to abort a flow early based on a condition (https://discourse.prefect.io/t/how-can-i-stop-a-flow-run-execution-based-on-a-condition/105). However, this results in failed run. Is there any way to end a flow early conditionally without a failed state. For example, we would like to run a daily dbt rollup when our source table has 24 hours of data, if not we should skip the dbt task. In this case, skipping the task is expected and show not results in a failed state.
    a
    • 2
    • 2
  • m

    Matt Delacour

    08/09/2022, 8:42 PM
    Also anything more informative on GraphQL about why I cannot delete a specific user, would be nice 🙏 What do you think is the problem of the following query?
    a
    • 2
    • 1
  • j

    John Archer

    08/09/2022, 11:35 PM
    Hello all, Is there a way to set the path for the
    manifest.json
    file that is created from the
    deployment build
    cli command. I see you can set the
    --output
    for the
    deployment.yaml
    file. I have a number of flows in a single repo and would like to keep the structure clean, I know I can run the command from within the directory that I have the flow in but am looking to run the commands from the project root. This is also so I can simplify CI/CD jobs in the future. Any help would be appreciated.
    m
    • 2
    • 2
  • t

    Thuy Tran

    08/10/2022, 4:46 AM
    I'm running latest prefect version 2.0.3 and getting the error below. Although the error isn't a show stopper since it didn't stop running after this point. Also I noticed the 'update-record' flow failed with validation error but the UI log doesn't show anything; only the terminal running the agent shows the error. I've seen this "Orion logging error" in version 2.0.2 and I think 2.0.1.
    00:35:13.071 | DEBUG   | Flow run 'logical-lemming' - Resolving inputs to 'update-record'
    00:35:14.896 | DEBUG   | prefect.client - Connecting to API at <http://127.0.0.1:4200/api/>
    00:35:17.563 | DEBUG   | prefect.agent - Checking for flow runs...
    00:35:22.591 | DEBUG   | prefect.agent - Checking for flow runs...
    --- Orion logging error ---
    The log worker encountered a fatal error.
    Traceback (most recent call last):
      File "/Applications/anaconda3/envs/datapipeline/lib/python3.9/site-packages/prefect/logging/handlers.py", line 82, in _send_logs_loop
        anyio.run(self.send_logs)
      File "/Applications/anaconda3/envs/datapipeline/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
        return asynclib.run(func, *args, **backend_options)
      File "/Applications/anaconda3/envs/datapipeline/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
        return native_run(wrapper(), debug=debug)
      File "/Applications/anaconda3/envs/datapipeline/lib/python3.9/asyncio/runners.py", line 47, in run
        _cancel_all_tasks(loop)
      File "/Applications/anaconda3/envs/datapipeline/lib/python3.9/asyncio/runners.py", line 56, in _cancel_all_tasks
        to_cancel = tasks.all_tasks(loop)
      File "/Applications/anaconda3/envs/datapipeline/lib/python3.9/asyncio/tasks.py", line 53, in all_tasks
        tasks = list(_all_tasks)
      File "/Applications/anaconda3/envs/datapipeline/lib/python3.9/_weakrefset.py", line 65, in __iter__
        for itemref in self.data:
    RuntimeError: Set changed size during iteration
    Worker information:
        Approximate queue length: 0
        Pending log batch length: 0
        Pending log batch size: 0
    00:35:27.618 | DEBUG   | prefect.agent - Checking for flow runs...
    00:35:32.655 | DEBUG   | prefect.agent - Checking for flow runs...
    00:35:37.680 | DEBUG   | prefect.agent - Checking for flow runs...
    00:35:42.713 | DEBUG   | prefect.agent - Checking for flow runs...
    00:35:46.427 | DEBUG   | prefect.flows - Parameter 'target_collection' for flow 'update-record' is of unserializable type 'Collection' and will not be stored in the backend.
    00:35:46.427 | DEBUG   | prefect.flows - Parameter 'sys_collection' for flow 'update-record' is of unserializable type 'Collection' and will not be stored in the backend.
    00:35:46.427 | DEBUG   | prefect.flows - Parameter 'metadata_collection' for flow 'update-record' is of unserializable type 'Collection' and will not be stored in the backend.
    00:35:47.751 | DEBUG   | prefect.agent - Checking for flow runs...
    00:35:48.177 | INFO    | Flow run 'logical-lemming' - Created subflow run 'knowing-avocet' for flow 'update-record'
    00:35:48.395 | ERROR   | Flow run 'knowing-avocet' - Received invalid parameters
    Traceback (most recent call last):
      File "/Applications/anaconda3/envs/datapipeline/lib/python3.9/site-packages/prefect/engine.py", line 433, in create_and_begin_subflow_run
        parameters = flow.validate_parameters(parameters)
      File "/Applications/anaconda3/envs/datapipeline/lib/python3.9/site-packages/prefect/flows.py", line 275, in validate_parameters
        raise validation_err
    prefect.exceptions.ParameterTypeError: 1 validation error for UpdateRecord
    end_date
      str type expected (type=type_error.str)
    00:35:48.435 | INFO    | Flow run 'logical-lemming' - Created task run 'update_record_in_db-a12634ec-0' for task 'update_record_in_db'
    00:35:48.435 | INFO    | Flow run 'logical-
    a
    • 2
    • 2
  • v

    Vadym Dytyniak

    08/10/2022, 8:15 AM
    Hi. Do you have any idea why I receive such warnings on the k8s agent?
    WARNING - agent | Job 'prefect-job-623be8f6' is for flow run '958c388f-f155-4ee8-a279-40c546b99808' which does not exist. It will be ignored.
  • b

    Benjamin.bgx

    08/10/2022, 9:16 AM
    Hey everyone, I have a little question regarding the block storage (and specifically the azure block storage). My understanding is we use the block as primary unit for prefect. There is two main use for them : 1. Store persistent logs 2. Store the flow I undertsand that with this python code I can define (and create the block) where I want to store the flow :
    block = Azure(azure_storage_connection_string="paste_the_string_here")
    block.save("dev")
    and with that commande line I finalize the storage information for the flow :
    prefect deployment build flow.py:flowname \
     --name deploy_name --tag dev -sb azure/dev
    Can I do the same things from the UI ? In this case, I just need to use the command line and reference the block define in the UI. Am I right with all this assumptions ? And so, how I specify the storage for the persistent logs ? Thank you ! 🙂
    o
    a
    i
    • 4
    • 7
  • v

    Vincent Chéry

    08/10/2022, 11:36 AM
    Hi! I'm having a problem with that's been impacting our system in production for months now. In an apparently random fashion, from once a month to once every few days, a request from a flow runner to the server API times out, and that's the start of a major sh*t storm which usually ends up in having to manually restart the machine. (Details in the next messages...)
    b
    • 2
    • 9
  • s

    Sudharshan B

    08/10/2022, 12:11 PM
    Hi! I'm facing error with using the for loop condition while using it in 'with Flow("flow_name") as flow' syntax and facing error as TypeError: object of type 'GetItem' has no len() How can I use the for loop inside the flow (Prefect v1.2.2). Any suggestions?
    m
    • 2
    • 1
  • c

    Christian Vogel

    08/10/2022, 12:18 PM
    Hi Prefect Community, i am facing the following issue:
    (begin_task_run pid=81126)     from prefect.packaging.docker import DockerPackager
    (begin_task_run pid=81126) ImportError: cannot import name 'DockerPackager' from partially initialized module 'prefect.packaging.docker' (most likely due to a circular import) (/home/christian/Documents/ray_and_prefect/temp/temp-venv/lib/python3.9/site-packages/prefect/packaging/docker.py)
    t
    a
    • 3
    • 16
  • p

    Paul Lucas

    08/10/2022, 12:35 PM
    Hi all I’m attempting to implement a flow where a task only runs if any of the direct upstreams fail so I’m using
    trigger=any_failed
    which works fine when one of those does fail. But when those upstream tasks are all successful, I seem to get a
    TriggerFailed
    on that task and therefore the flow run is
    FAILED
    . Is that expected behaviour and if so, how do I instead get it to just skip that task instead of marking it as Failed and therefore the whole flow as failed? Thanks in advance
  • k

    Kelvin

    08/10/2022, 12:37 PM
    Hi! Now that 2.0 is GA, is there a Prefect 2.0 published roadmap somewhere of future plans for next few months (or beyond)?
    t
    • 2
    • 1
  • l

    Lucien Fregosi

    08/10/2022, 12:54 PM
    Hi Prefect team Regarding Schedule, is it possible to build the yaml file with a
    --schedule
    option (like we do for storage block for instance) ? In my automated process I can’t edit the deployment.yaml file as I re-build this file every time in case of something has changed
    t
    • 2
    • 6
  • v

    Vlad Tudor

    08/10/2022, 12:59 PM
    Hi, I am fairly new to Prefect so any help (resources / tips&tricks etc) is appreciated. I want to setup a Prefect Server and a Local Agent. I got a
    docker-compose.yaml
    from running
    prefect server config
    (posted below). I should now add a new service at the end that runs a Local Agent and a service that registers my flow at startup - is that correct? For now if I add a service that runs
    prefect agent local start
    I get
    ValueError: You have not set an API key for authentication.
    when running
    docker-compose up
    .
    • 1
    • 1
  • m

    Matt Delacour

    08/10/2022, 1:13 PM
    What's the default value of "heartbeat_mode" ? process / thread or off? https://docs-v1.prefect.io/orchestration/concepts/services.html#heartbeat-configuration
    a
    t
    • 3
    • 6
  • n

    Nikhil Joseph

    08/10/2022, 1:43 PM
    Hi, am migrating my pipelines from v1 to v2. Each of the tasks returns large amount of data, so movement of data from task to task is quite slow. I noticed that the results are being stored in the local storage n if I am right thats why its so slow, i removed that setting in v1 (dont really remember how) n was wondering if such an option is available in v2
    t
    • 2
    • 2
Powered by Linen
Title
n

Nikhil Joseph

08/10/2022, 1:43 PM
Hi, am migrating my pipelines from v1 to v2. Each of the tasks returns large amount of data, so movement of data from task to task is quite slow. I noticed that the results are being stored in the local storage n if I am right thats why its so slow, i removed that setting in v1 (dont really remember how) n was wondering if such an option is available in v2
t

Taylor Curran

08/10/2022, 1:47 PM
Hi Nikhil, great question! The ability to change how task results are persisted is currently being worked on by our engineers and should be available before the end of the quarter. I’m not sure on the best workaround in the meantime, but I’ll try to see if someone else knows.
n

Nikhil Joseph

08/10/2022, 1:49 PM
uh oh, guess ill postpone the migration for awhile
👍 1
View count: 2