https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Stephen Lloyd

    04/18/2022, 4:22 AM
    Hi, I’m getting an error I don’t understand, along with unexpected behavior:
    k
    • 2
    • 6
  • l

    Leanna Morinishi

    04/18/2022, 6:07 AM
    Hello there! I have a task that most of the time I want to depend on a single upstream task. However, at the end of the flow, I want it to run if
    any_successful
    for several upstream tasks. When I write it like below,
    task_5
    doesn’t fail, even if
    task_3
    and
    task_4
    fail. Is it because the creation of the list
    [task_3, task_4]
    succeeds? How should I write this flow instead? Many thanks!
    task_3 = my_task(
        "input1", task_args=dict(name="input1")
        ).set_upstream(task_1)
    
    task_4 = my_task(
        "input2", task_args=dict(name="input2")
        ).set_upstream(task_2)
    
    task_5 = my_task("all_inputs",
        task_args=dict(name="all_inputs", trigger=any_successful),
        ).set_upstream([task_3, task_4])
    e
    • 2
    • 2
  • a

    Amir Shur

    04/18/2022, 11:19 AM
    Hi! There's a prefect feature I was looking for but couldn't quite found it and wondered if anyone knows how to achieve the functionality I'm looking for: I'm looking to pause a flow if a certain condition is met, and if so, I want to allow continuing the flow after entering new input file as a parameter. I know there's this: https://docs.prefect.io/core/idioms/pause-for-approval.html but couldn't find the option for more complicated input than approve/disapprove. Any help will be much appreciated, thx!
    k
    • 2
    • 3
  • m

    Matthew Seligson

    04/18/2022, 12:44 PM
    I have a flow scheduled to run on the second weekday of October, but I don’t see it in the “upcoming runs” or upcoming schedule of the flow in the UI. Is this expected?
    k
    • 2
    • 6
  • x

    Xavier Babu

    04/18/2022, 3:45 PM
    Dear Orion community, If I need to deploy a flow (createflow or deployflow) via Orion REST API, how can I pass the flow-location, so that it can pickup the python file from a location?
    m
    • 2
    • 15
  • j

    Jack Sundberg

    04/18/2022, 4:42 PM
    Hey everyone! Just like you can do class-based Tasks, are you able to do class-based workflows? With Prefect Orion introducing the
    @flow
    decorator, I'm guessing this will be possible, but don't see any docs on it.
    k
    • 2
    • 9
  • d

    Dylan

    04/18/2022, 6:32 PM
    Hey when I try to toggle on a schedule I get an error message in the UI back from GraphQL, this is on a cloud backend with a K8's agent running on EKS
    a
    k
    • 3
    • 23
  • d

    Dylan

    04/18/2022, 6:32 PM
    [{
        "data": {
            "set_schedule_active": {
                "success": false,
                "__typename": "success_payload"
            }
        }
    }]
  • s

    Shuchita Tripathi

    04/18/2022, 7:12 PM
    Hi, I am trying to create a flow based on some dynamic values. These dynamic values tell the tasks which are required to create that flow. My tasks are saved in separate files. For eg: task1.py
    def task1():
        @task
        def run_terraform_lt():
            tf = Terraform(working_dir="law_tf")
            tf.init()
    		tf.apply()
    task2.py
    def task2():
        @task
        def run_terraform_rt():
            tf = Terraform(working_dir="rt_tf")
            tf.init()
    		tf.apply()
    The input is similar to this:
    {
        "task1": {
          "id": "foo",
          "task_name": "task1"
        },
        "task2": {
          "id": "bar",
          "task_name": "task2",
        }
    }
    I am getting the task name from this dictionary. Based on the value of "task_name", I have to create a flow combining all tasks. I am creating a flow where I am trying to add them, but the tasks are not getting added in the flow. Anyone has any idea on how this can be achieved? Here is the snippet of my flow creation code. It is inside a for loop through which I am extracting the task name and other variables.
    k
    • 2
    • 22
  • f

    Fina Silva-Santisteban

    04/18/2022, 8:54 PM
    Hi everyone! I’ve created a flow-of-flows which runs another flow in a loop, something like this:
    with Flow('Parent Flow') as flow:
        
        dates = ['2021-01-04', '2021-01-05', '2021-01-06'] // a list of dates
    
        for i in range(len(dates)):
           //run child flow with i as parameter
    When I check the Prefect UI it seems like the child flows are running in parallel (I’m using threads) which is in general great but in this case I’d like the child flows to be run sequentially. Is there a way to force the parent flow to run them that way?? 🤔
    k
    • 2
    • 5
  • r

    RAISS Zineb

    04/18/2022, 11:39 PM
    Hello community, I tried to connect Prefecta with my SQL server database, but I got this error. who has already had this error, or has an idea about this error?
    k
    a
    • 3
    • 22
  • j

    Jacob Blanco

    04/19/2022, 4:23 AM
    Trying to ship logs from our containerized flows executed on EC2 to DataDog, I’m trying to figure out what is the best approach. As far as I can tell we can: a) Use the DataDog agent already on the EC2 instance to collect logs from the containers b) Add the DataDog agent to the flow images and write a custom logger to ship the logs to statsd c) Something else I haven’t thought about I think a. has the advantage that we don’t need to do any custom coding on each flow and we get all the logs for all flow runs for free. Anyone have experience with this approach? It seems all we need according to this: https://docs.datadoghq.com/agent/docker/log/ is to enable the Docker logs collections and maybe configure the logs in the Dockerfile.
    a
    • 2
    • 3
  • o

    Omar Sultan

    04/19/2022, 8:13 AM
    Hello Everyone, We currently have a setup of Prefect Core Server running on our on-prem environment using Kubernetes. We are running our flows using a LocalDaskExecutor and we noticed that the logs for these flows do not show up on the portal. Only logs for flows that do not use a LocalDaskExecutor show logs. Is there a special configuration required for Dask to send the logs?
    a
    • 2
    • 9
  • s

    Sergey Gerasimov

    04/19/2022, 8:24 AM
    Hi folks, does sombody deal with so called business rules management systems (drools, camunda)? Can prefect be considered as an alternative to them? Maybe it good choice to build your own business rules management system on top of prefect? Drools and camunda are java-based and heavy. I look for some lighwight pythonic solution like stream/batch (btw does prefect is really quick in processing streams?) DAG management system with visualization of graphs and ideally with ability to separate conditional expressions (like if ... then ...) and other code.
    a
    • 2
    • 1
  • j

    Joshua Greenhalgh

    04/19/2022, 12:31 PM
    Hi all, wonder of anyone can point me to an official task for pulling a git repo? I am wanting to use such a task in the context of running DBT - can I perhaps use something from the underlying Github tasks - don't see an explicit clone task...
    a
    • 2
    • 15
  • m

    Mini Khosla

    04/19/2022, 1:31 PM
    I have prefect server 0.15.7 deployed. My flows were running fine a few days back. They seem to not execute properly now as for some of the mapped tasks the server is not able to set the state as Success/Fail. Here is the screen shot. I am running this on a yarn cluster. Any help will be appreciated.
    k
    • 2
    • 8
  • j

    Joshua Greenhalgh

    04/19/2022, 1:33 PM
    Hi all - wonder if anyone has any experience running prefect flows on GKE spot instances? Is there any way of dealing gracefully with possible SIGTERM signals?
    a
    k
    • 3
    • 17
  • d

    Dekel R

    04/19/2022, 2:11 PM
    Hey all, I get this weird behavior once in a while - there are no logs in 1 of the flows I’m running. Most times it works fine and I get all the expected logs but sometimes only this - see the attached file. Am I missing something? Anyone have the same issue? This specific run had a bug (a python bug in the app itself that I couldn’t see..) Thanks
    logs.json
    k
    • 2
    • 5
  • p

    Pedro Machado

    04/19/2022, 2:20 PM
    Hi everyone. We are using
    wait_for_flow_run
    to wait for a subflow run. The subflow is a long-running job that can take more than 12 hours and sometimes it times out. I noticed that
    wait_for_flow_run
    uses
    watch_flow_run
    which raises an exception if the flow has been running for more than 12 hours. The 12 hours timeout is hardcoded. See https://github.com/PrefectHQ/prefect/blob/afda99411f91582ad187bf33671268d8d3c3c2c0/src/prefect/backend/flow_run.py#L95 We plan to upgrade to 2.0 shortly after it's released. Is this timeout limitation for subflows also implemented in Orion?
    k
    • 2
    • 1
  • h

    Hugo Shi

    04/19/2022, 2:42 PM
    I have a flow that I broke recently (bug in code). Now when I try to execute it in prefect cloud, There are 5 flows submitted, but nothing is happening to them. The agent we have is successfully picking up and running other flows. Do I need to clear the submitted flows? and how would I do so?
    k
    • 2
    • 15
  • s

    Shuchita Tripathi

    04/19/2022, 3:24 PM
    My prefect flow has few tasks which are running terraform. Is there any way I can see the detailed log of the task getting executed. for eg, when terraform runs, it displays the logs on console. is there any way i can see those logs? in the prefect console, i see a very basic log which doesn't tell me what is going on inside the task
    k
    • 2
    • 4
  • w

    wiretrack

    04/19/2022, 3:34 PM
    Hi guys, is there any way to have multiple "instances" of a flow? We have a single flow, but use it for N users, we wanted it to be 1 flow per user for monitoring purposes, but we can't seem to find a way to create a new flow from an existing storage, only changing a parameter. Any ideas?
    k
    • 2
    • 2
  • p

    Prasanth Kothuri

    04/19/2022, 3:51 PM
    I have a strange error on a task, any one seen these before?
    Task 'copy_from_s3_to_sftp': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.9/dist-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/usr/local/lib/python3.9/dist-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "flows/k8s/my_flow_name.py", line 46, in copy_from_s3_to_sftp
    SystemError: unknown opcode
    k
    • 2
    • 16
  • a

    Adi Gandra

    04/19/2022, 3:58 PM
    Hey I am trying to upgrade my prefect agent that I have running on EKS, I upgraded my prefect package locally it is now version 1.2. When I try to apply the changes
    prefect agent kubernetes install  -k {key} --mem-request 4G --mem-limit 6G --cpu-request 2 --rbac | kubectl apply -f -
    Nothing seems to happen I just get the message:
    deployment.apps/prefect-agent configured
    <http://role.rbac.authorization.k8s.io/prefect-agent-rbac|role.rbac.authorization.k8s.io/prefect-agent-rbac> unchanged
    <http://rolebinding.rbac.authorization.k8s.io/prefect-agent-rbac|rolebinding.rbac.authorization.k8s.io/prefect-agent-rbac> unchanged
    Any idea’s on how to successfully upgrade my prefect agent?
    k
    • 2
    • 3
  • c

    Chris Reuter

    04/19/2022, 4:28 PM
    Hot fresh dbt content from Prefectionist and all-around cool guy @James Sopkin for those of you looking to automate :dbt: exposures: https://prefect-community.slack.com/archives/CL09KTZPX/p1650385229289719
    🔥 1
    🚀 1
    :upvote: 4
  • p

    Philip MacMenamin

    04/19/2022, 7:41 PM
    Strange behaviour with multiple
    upstream_tasks
    on a ShellTask This works:
    brt_commands = create_brt_command.map(adoc_fp=updated_adocs)
            brt_commands_logged = log(item=brt_commands, desc="BRT commands")
            brts = shell_task.map(
                command=brt_commands, upstream_tasks=[tomogram_fps]
            )
    This fails:
    brt_commands = create_brt_command.map(adoc_fp=updated_adocs)
            brt_commands_logged = log(item=brt_commands, desc="BRT commands")
            brts = shell_task.map(
                command=brt_commands, upstream_tasks=[tomogram_fps, brt_commands_logged]
            )
    :discourse: 1
    k
    n
    +2
    • 5
    • 15
  • p

    Philip MacMenamin

    04/19/2022, 8:12 PM
    Quietening down logs - if I'm mapping out to a lot of job, have a lot of params etc, it can be difficult to follow logs. Is there a way I can set logging such that I only see something if it's broken. If I
    export PREFECT__LOGGING__LEVEL=INFO
    I see:
    python3 tmp/shell_task.py 
    [2022-04-19 21:09:30+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'My Flow'
    [2022-04-19 21:09:30+0100] INFO - prefect.TaskRunner | Task 'MyShellTask': Starting task run...
    [2022-04-19 21:09:30+0100] INFO - prefect.MyShellTask | lsto echo
    [2022-04-19 21:09:30+0100] INFO - prefect.TaskRunner | Task 'MyShellTask': Finished task run for task with final state: 'Success'
    [2022-04-19 21:09:30+0100] INFO - prefect.TaskRunner | Task 'problem': Starting task run...
    [2022-04-19 21:09:30+0100] INFO - prefect.TaskRunner | FAIL signal raised: FAIL('Oh no!')
    [2022-04-19 21:09:30+0100] INFO - prefect.TaskRunner | Task 'problem': Finished task run for task with final state: 'Failed'
    [2022-04-19 21:09:30+0100] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    If I
    export PREFECT__LOGGING__LEVEL=ERROR
    I see nothing. Ideally I'd like to only see messages about broken stuff. Ideas?
    n
    a
    • 3
    • 24
  • j

    Jason

    04/19/2022, 10:53 PM
    What's a good starting area for the local dev story? We're deploying to a production ECS cluster with a decent Github CI/CD so far. Ideally a simple
    Makefile
    would spin up a few Docker containers that one could test flows on. How can I abstract between a local and ECS agent in my flows to allow something like a
    env
    var to swap between?
    k
    a
    • 3
    • 4
  • j

    Jai P

    04/20/2022, 12:58 AM
    are there any best practices around testing flows/tasks in prefect 2.0? I see this page but something we're noticing that testing can be particularly slow on `flow`s, (sometimes taking ~1s to start up a each test) and it appears we always need to wrap `task`s inside of a flow to test them
    :discourse: 1
    m
    m
    +2
    • 5
    • 24
  • j

    Josh

    04/20/2022, 3:21 AM
    Any idea what might be causing errors with artifact generation? I’m getting these errors in my task runs:
    Error during execution of task: ClientError([{'path': ['create_task_run_artifact'], 'message': 'Task run <task_run_id> not found', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    k
    v
    • 3
    • 15
Powered by Linen
Title
j

Josh

04/20/2022, 3:21 AM
Any idea what might be causing errors with artifact generation? I’m getting these errors in my task runs:
Error during execution of task: ClientError([{'path': ['create_task_run_artifact'], 'message': 'Task run <task_run_id> not found', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
k

Kevin Kho

04/20/2022, 3:23 AM
I think so. Artifacts are attached to tasks and use the task id as an identifier. You might be making one outside of a task?
j

Josh

04/20/2022, 5:09 PM
it’s made from the task that it’s in.
Or at least I was trying to call create artifact from within the task.run method
k

Kevin Kho

04/20/2022, 5:11 PM
Maybe we can try to grab it from text to pass it in manually one sec
So it’s tried to just pull it but I guess it can’t. Is that a
task.run()
inside a
task
inside a
Flow
? Or just a task.run not in a Flow?
j

Josh

04/20/2022, 5:16 PM
something like
class Task():
  def run():
    # do something
    create_markdown()

task = Task()

with Flow("my flow") as Flow:
  task()
k

Kevin Kho

04/20/2022, 5:17 PM
This looks pretty right to me. Would you be able to give me a minimal example?
Or I can try in a couple of hours
j

Josh

04/20/2022, 10:07 PM
I don’t think this is an easily reproducable issue. The problem occurred in a few mapped tasks of a flow run. I can DM you the IDs of the tasks if you want so you can try and locate the logs of those attempts. The errors indicate that there could be something possibly logged on the prefect cloud side that might let you know what the problem was.
k

Kevin Kho

04/20/2022, 10:20 PM
We dont have any more logs than the ones you see either on your Flow. Can test this tonight.
So I got around to trying, is your code just something like this?
from prefect import Task, Flow
from prefect.backend.artifacts import create_markdown_artifact

class MyTask(Task):
    def run(self, x):
        create_markdown_artifact("# Heading\n\nText with [link](<https://www.prefect.io/>).")

mytask = MyTask()
with Flow("artifact_test") as flow:
    mytask.map(list(range(100)))

flow.register("databricks")
j

Josh

04/21/2022, 6:58 PM
yeah exactly
we run it on docker agent with local dask executor if that makes any difference
k

Kevin Kho

04/21/2022, 6:59 PM
I dont think the docker agent will affect but I can try the local dask executor
v

Vinny Tunnell

05/15/2022, 10:48 PM
I am facing the same issue with a mapped task that calls
create_markdown_artifact
. This was working fine for us for the last few weeks, but this error was thrown multiple times yesterday. We are using LocalDaskExecutor with a Kubernetes agent.
View count: 28