https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • y

    YD

    08/17/2021, 1:30 AM
    trying to trigger a flow after another flow ends following instructions in https://docs.prefect.io/core/idioms/flow-to-flow.html when registering the flows A, B, C and D, in the example, I assume we register them without a schedule, is this correct ? I also do not see the in the python example code the
    flow.register(project_name='example')
    we do need to run
    flow.register
    , right ?
    k
    1 reply · 2 participants
  • s

    Samuel Tober

    08/17/2021, 8:46 AM
    Hey! Is there a way to get no. successful runs of a certain flow during the execution of the flow? I want to build a state handler that outputs no. successful runs
    k
    2 replies · 2 participants
  • c

    Chhaya Vankhede

    08/17/2021, 9:45 AM
    Hello, I wanted know if there a way to pause the flow schedule and delete flow. I tried looking into docs but couldn't find anything(may be I missed something). I can delete the flow and stop the schedule using UI, but I need something by which I do it from code itself.
    k
    5 replies · 2 participants
  • b

    Brett Jurman

    08/17/2021, 4:50 PM
    For running prefect with coiled, is there a way to set the resource requirments needed for a task to run (Maybe on a per task basis)? I'm seeing multiple tasks schedule on the same machine and leave the rest idle
    k
    4 replies · 2 participants
  • m

    Madison Schott

    08/17/2021, 5:08 PM
    What would be the correct syntax to naming my tasks? The
    name=""
    suggested in the documentation isn't working for the format I have my tasks in
    campaign_details_dbt_run = dbt_task(name="Campaign Details", command='dbt run -m campaign_details')
    k
    c
    9 replies · 3 participants
  • i

    Italo Barros

    08/17/2021, 6:48 PM
    What's the difference between the signals.FAIL and signals.TRIGGERFAIL? Also, there's an option to raise the signals.RETRY but only retry the task after the specified retry_delay? If I call this inside a task it enters in an infinite loop.
    z
    k
    4 replies · 3 participants
  • y

    YD

    08/17/2021, 6:52 PM
    Importing issues There was a post on this topic, but I can't find it... if I have a project with the following structure: -- project -- source_code -- test_code -- prefect_workflows it appears to be a problem to use in the prefect flow a functions from the
    source_code
    folder, that have local imports, and that we need to create a python package in order to use local code. is this correct or is there some way around it ?
    m
    k
    3 replies · 3 participants
  • b

    Blake Hughes

    08/17/2021, 7:24 PM
    dagster
  • b

    Blake Hughes

    08/17/2021, 7:24 PM
    Sorry. Please ignore.
  • t

    Timothy Byrne

    08/17/2021, 8:26 PM
    Simple(ish) question, but how do you return the results of a
    SnowflakeQuery
    task into a Pandas dataframe?
    k
    2 replies · 2 participants
  • m

    Mark McDonald

    08/17/2021, 9:30 PM
    If I encounter a certain condition in a flow run, I'd like to cancel the flow run (rather than fail it or mark it successful). It doesn't seem like there is a way to do this from prefect.engine.signals. Is there any way to do this (apart from the UI)? https://docs.prefect.io/core/concepts/execution.html#state-signals
    k
    2 replies · 2 participants
  • y

    YD

    08/17/2021, 9:46 PM
    Is there a way to get an alert if a connection to an agent becomes inactive ?
    k
    9 replies · 2 participants
  • h

    Hui Zheng

    08/17/2021, 10:04 PM
    Hello, I have a flow that set DEFAULT retry greater than 0.
    "PREFECT__TASKS__DEFAULTS__MAX_RETRIES": 2,
            "PREFECT__TASKS__DEFAULTS__RETRY_DELAY": 5,
    Now, I want to set an individual task_A to
    retry=0
    (not to retry at all) and I find it impossible to do that. I will explain the details in the thread.
    k
    m
    25 replies · 3 participants
  • c

    Carter Kwon

    08/17/2021, 10:07 PM
    Hello, we use ECS to run our flows and lately I've been noticing that the task definition is created and immediately becomes
    [INACTIVE]
    . It's particularly strange because the inactive task definition shows "none" for the task role, but there is a role attached. I'll add a screenshot in the thread. The flow finishes successfully and writes to an s3 bucket that requires the role permissions. Any ideas what could be happening?
    k
    26 replies · 2 participants
  • m

    mithalee mohapatra

    08/17/2021, 11:30 PM
    I am trying to use a data class as a paramter to my task but getting the below error while registering the flow. Error: " Beginning health checks... System Version check: OK Traceback (most recent call last): File "/opt/prefect/healthcheck.py", line 151, in <module> flows = cloudpickle_deserialization_check(flow_file_paths) File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check flows.append(cloudpickle.loads(flow_bytes)) AttributeError: Can't get attribute 'WriteConfig' on <module 'pypline.prefect.tasks.hudi_writer_eks' from '/usr/local/lib/python3.8/site-packages/pypline/prefect/tasks/hudi_writer_eks.py'> " Prefect Code: I am trying to use a data class as a parameter to my task but getting the below error while registering the flow. I am using Prefect version 0.14.6 Error: " Beginning health checks... System Version check: OK Traceback (most recent call last): File "/opt/prefect/healthcheck.py", line 151, in <module> flows = cloudpickle_deserialization_check(flow_file_paths) File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check flows.append(cloudpickle.loads(flow_bytes)) AttributeError: Can't get attribute 'WriteConfig' on <module 'pypline.prefect.tasks.hudi_writer_eks' from '/usr/local/lib/python3.8/site-packages/pypline/prefect/tasks/hudi_writer_eks.py'> import boto3 import prefect from kubernetes import client, config from kubernetes.client.rest import ApiException from prefect import task from prefect.engine.signals import FAIL, SKIP from pypline.lib.s3 import S3URI from dataclasses import dataclass @dataclass class WriteConfig: target_base_path: str target_table_name: str pk_column: str partition_column: str pre_combine_column: str source_path: str def validate(self): if not self.target_base_path: raise FAIL("'target_base_path' is required for hudi_options.") # TODO: validate that the required params are not empty @task def load_configs(executor_instances, hudi_options: WriteConfig): hudiparms = WriteConfig(target_base_path=hudi_options[0] ,target_table_name=hudi_options[1] , pk_column=hudi_options[2] , partition_column=hudi_options[3] , pre_combine_column=hudi_options[4] , source_path=hudi_options[5]) print(hudiparms) with Flow('Write Table',executor=get_dask_executor(for_k8, 4, 25, 'mmm/mmm-phase-0:' + mmm_version)) as hudi_flow:) as flow: executor_instances = Parameter('executor_instance', default=1) hudi_options= Parameter('hudi_options', default=[ "test_write" ,"test_table" ,"id" ,"one" ,"two" ,"userdata1.parquet"] ) load_configs(executor_instances, hudi_options)
    k
    15 replies · 2 participants
  • a

    Adam Shamlian

    08/18/2021, 12:04 AM
    I have a question regarding passing `LocalResult()`s in a flow of flows while testing the parent flow. The pattern here (subflow result passing) makes sense, but it seems like its designed for when running in prod on Server or Cloud. Is there a way to fudge a
    flow_id
    for
    get_task_run_result
    when just calling
    flow.run()
    for testing, or should I be thinking about this in an entirely different way when testing?
    k
    2 replies · 2 participants
  • a

    Alexander Seifert

    08/18/2021, 8:43 AM
    hello! beginner question: in DVC’s pipeline feature, a stage will only be re-executed if something in the stage definition or its dependencies changes (see https://dvc.org/doc/user-guide/project-structure/pipelines-files#stages). this is not default behaviour in prefect, right? how do i do this with prefect?
    k
    b
    21 replies · 3 participants
  • a

    Adam Everington

    08/18/2021, 9:07 AM
    SqlServerExecuteMany(fast_executemany) performance vs SqlAlchemy(fast_Executemay) any ideas?
    k
    6 replies · 2 participants
  • k

    Kamil Gorszczyk

    08/18/2021, 12:21 PM
    Hello everyone. We primarily use prefect for ETL jobs, loading data from one DB/file to our own DB. We use a master flow (flow of flows) which launches the sub-flows based on their dependency and scheduled_start_time. Sometimes, the systems we get our data from, report a delay by x hours. Is there any way (preferably through the UI) to pause this one flow and restart it later manually? Since there are more flows in the master flow, the other should continue as planned.
    k
    4 replies · 2 participants
  • p

    Pierre Monico

    08/18/2021, 1:31 PM
    Is there something I am missing with the new
    prefect run
    CLI
    --module
    option? • Doing
    python -m flows.myflow
    works • Doing
    prefect run -m flows.myflow
    raises
    No module named 'flows'
    k
    m
    15 replies · 3 participants
  • k

    Kim Pevey

    08/18/2021, 3:21 PM
    Hello! I have a workflow setup that works when I’m running on a local server. However, when I switch to production which is running on a KubernetesAgent, the flow fails at runtime with
    Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage')
    I’m not sure how to debug this. What are the possible causes of this error?
    k
    j
    +1
    24 replies · 4 participants
  • p

    Paolo

    08/18/2021, 4:06 PM
    Hello folks, while converting an old and monolithic ETL to a cool Prefect ETL, I stumbled upon the following situation:
    @task("A small task"))
    def small_task(some_params):
        # stuff happens here
        return something
    
    @task("A complex task with many steps")
    def complex_task(lots_of_params):
        # stuff happens
        # more stuff happens
        small_task_result = small_task(some_params)
        # even more stuff happens which uses small_task_result
    My question is: what happens if I nest tasks as above? I suspect prefect would not like it, but I haven't set up a test infrastructure yet. Of course, I could use small_task as a simple function (remove the @task decorator), or break up complex_task in smaller tasks, but both solutions would mean for me to either leave things as they are (awful) or rewrite a lot of code.
    k
    2 replies · 2 participants
  • a

    Adam Lancaster

    08/18/2021, 4:37 PM
    Hello Prefect team, We had a connection timeout error with https://api.prefect.io earlier today around 2:20am Pacific time. Were there any known outages around then? Is there a page that tracks outages?
    c
    2 replies · 2 participants
  • k

    kiran

    08/18/2021, 4:38 PM
    Does anyone by chance have a data engineering-specific cookiecutter recommendation? Most of the ones I’m seeing are for data science. Obviously I can make my own, but figured I’d see if other folks are using it
    a
    2 replies · 2 participants
  • c

    Charles Leung

    08/18/2021, 4:44 PM
    Hey Team, i'm having an issue where my jobs are failing to pick up "PREFECT__USER_CONFIG_PATH" environment variable that i've preset on a base docker image. But when i bash into the docker container while running, it works fine with flow.run() .. Can you help me understand what happens when a flow is deployed? We're currently using a modified gitlab storage that recursively downloads all files into the home directory, one of which is .prefect/config.toml. For example, whats the order of operations? My hypothesis is that the flow tries to run globals first, and then run the gitlab storage to get the flows, then follow tasks to execute? For more details, i'm trying to make the run_config assigned to the flow configurable through the config.toml file
    # Registration Configs
    flow.run_config = NomadRun(**prefect.config.run_config)
    flow.storage = GitLab(path='flow.py', **prefect.config.storage)
    k
    15 replies · 2 participants
  • w

    Wilson Bilkovich

    08/18/2021, 5:15 PM
    Are we still thinking of doing this? We evaluated Pulumi a while back and it’s very cool; I’d be up for using this: https://github.com/PrefectHQ/server/pull/171
    k
    m
    3 replies · 3 participants
  • f

    Florian Kühnlenz

    08/18/2021, 6:24 PM
    Hi community, has anyone used prefect to track data lineage? We have a project that would greatly benefit from knowing what data flows from where to where. Since prefect already builds a graph we have been thinking that we just need to put some effort in how we structure our flows so load and store happens in easily identified tasks, and then ask the right questions from the resulting DAG. If anyone had done something similar already it would be very interesting to hear how it went!
    k
    a
    +2
    15 replies · 5 participants
  • y

    YD

    08/18/2021, 7:12 PM
    have a question on cloud standard offer what does it means by, 250 nightly ETL tasks, 10K production tasks, 100K ML tasks… aren’t all tasks just tasks? do we need to define a task as ETL, Production or ML ?
    k
    1 reply · 2 participants
  • a

    Adam Shamlian

    08/18/2021, 7:35 PM
    Suppose that in a flow of flows, flow A has a task that generates a "complex" object, e.g. a dataclass, that is then consumed by flow B as a parameter:
    @task(result=LocalResult(...))
    def a_do_something():
        return MyClass(1, 2, 3)
    
    with Flow('A') as child_A:
        a_result = a_do_something() # 
    
    with Flow('B') as child_B:
        b_param = Parameter('a_result_value')
        # option 3 below: b_param.serializer = MyClassJSONSerializer() or similar
        b_result = b_do_something(b_param)
    
    with Flow('parent') as parent:
        a_id = create_flow_run(name=child_A.name)
        a_result = get_task_run_result(a_id, 'a_do_something')
        create_flow_run(name=child_B.name, parameters={'a_result_value': a_result})
        # also wait for a_id, set A upstream of B, wait for b_id
    Because Parameters are/return? PrefectResults, they must be JSON serializable (indeed, if I just try the above I get a
    Type Error: MyClass is not JSON serializable
    or similar). Is the expectation that I should be messing with: 1. the serialization of LocalResult from
    a_do_something
    2. somehow injecting a serializer into Parameter (my attempts here have thus far failed - is this even possible?), or 3. modifying the
    b_param
    result (see the commented line in
    child_B
    flow) Note: based on my reading of several docs pages regardings Results and Serializers, option 2 strikes me as the most intuitive place to look. That is, it seems Prefect wants you to solve this there.
    k
    11 replies · 2 participants
  • e

    Eddie Atkinson

    08/19/2021, 2:32 AM
    Hi gang, I am playing around with getting data logged to the stdout to be displayed in both the prefect logs and my team’s cloudwatch logs. Does anyone have any ideas how to do this? The flow is super simple but the logs aren’t showing up in cloudwatch:
    from prefect.run_configs import ECSRun
    from prefect import task, Flow
    from prefect.storage import S3
    import prefect
    
    TASK_ARN = "an arn"
    STORAGE = S3(bucket="my-cool-bucket")
    LABELS = ["a label"]
    
    RUN_CONFIG = ECSRun(
        labels=LABELS,
        task_role_arn=TASK_ARN,
        image="prefecthq/prefect:latest-python3.8",
        memory=512,
        cpu=256,
    )
    
    
    @task(log_stdout=True)
    def bye_world():
        print("bye world")
    
    
    @task(log_stdout=True)
    def hello_world():
        print("Hello, World!")
    
    
    with Flow("test_flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
        hello_world()
        bye_world()
    
    
    flow.register(project_name="Tesla")
    k
    m
    7 replies · 3 participants
Powered by Linen
Title
e

Eddie Atkinson

08/19/2021, 2:32 AM
Hi gang, I am playing around with getting data logged to the stdout to be displayed in both the prefect logs and my team’s cloudwatch logs. Does anyone have any ideas how to do this? The flow is super simple but the logs aren’t showing up in cloudwatch:
from prefect.run_configs import ECSRun
from prefect import task, Flow
from prefect.storage import S3
import prefect

TASK_ARN = "an arn"
STORAGE = S3(bucket="my-cool-bucket")
LABELS = ["a label"]

RUN_CONFIG = ECSRun(
    labels=LABELS,
    task_role_arn=TASK_ARN,
    image="prefecthq/prefect:latest-python3.8",
    memory=512,
    cpu=256,
)


@task(log_stdout=True)
def bye_world():
    print("bye world")


@task(log_stdout=True)
def hello_world():
    print("Hello, World!")


with Flow("test_flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
    hello_world()
    bye_world()


flow.register(project_name="Tesla")
My hunch is that I need to get a log group attached to my task definition. However, it looks like prefect creates a new task definition on each run, so I guess I need to specify the details of that?
Never mind, I figured it out. For anyone who is interested I created another ECS task, attached a log group, and then specified the task definition family as the
task_definition_arn
in the call to
ECSRun
. I’m using the serverless framework and here is the config for those that are interested:
tasks:
      prefect-task:
        name: ${self:custom.ecsTaskName}-prefect-task
        image: 'prefecthq/prefect:latest-python3.8'
        desired: 0
        cpu: 256 # 0.25vcpu
        memory: 512 # 512mb
        override:
          role: ${self:custom.ecsBaseTaskRoleName}
          container: 
            Name: flow 
            # container must be called 'flow' for prefect to run tasks 
            # when using ECSRun: <https://docs.prefect.io/api/latest/run_configs.html#ecsrun>
            LogConfiguration:
              LogDriver: "awslogs"
              Options:
                awslogs-group: ${self:custom.ecsClusterLogGroupName}
                awslogs-region: ${self:custom.region}
                awslogs-stream-prefix: ${self:custom.ecsTaskLogPrefix}
With this flow:
RUN_CONFIG = ECSRun(
    labels=LABELS,
    task_definition_arn=TASK_DEFINITION_FAMILY,
    memory=512,
    cpu=256,
)


@task(log_stdout=True)
def bye_world():
    print("bye world")


@task(log_stdout=True)
def hello_world():
    print("Hello, World!")


with Flow("a_flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
    hello_world()
    bye_world()
k

Kevin Kho

08/19/2021, 4:54 AM
@Marvin archive “Log ECS Task to CloudWatch”
m

Marvin

08/19/2021, 4:54 AM
https://github.com/PrefectHQ/prefect/issues/4883
k

Kevin Kho

08/19/2021, 4:55 AM
Hey @Eddie Atkinson, thanks for circling back. I was gonna suggest doing it on the agent side like this , but I think the run config is better.
e

Eddie Atkinson

08/19/2021, 5:02 AM
Hi @Kevin Kho, thanks for your reply. I think my approach works best for my use case as members of my team can now define arbitrary flows to run with that task definition and always know that they’re going to get logs on Cloudwatch without needing to fiddle with task definitions themselves. P.S. I think it’s great that Prefect is open source, being able to read through source code is sometimes a great complement to the documentation!
k

Kevin Kho

08/19/2021, 5:03 AM
I agree with the approach 👍 And yes, the codebase is pretty legible so we have a lot of users subclassing things
👍 1
View count: 2