• Dekel R

    Dekel R

    6 months ago
    Hey, I have a complex data extraction flow with 2 main cases. Each case has its own tasks and some tasks are relevant to both of them. The layout of the flow looks somewhat like this -
    with Flow('html_data_extraction__dev',
              storage=Docker(registry_url="us-central1-docker.pkg.dev/***/",
                             dockerfile="./Dockerfile"),
              schedule=daily_schedule, executor=LocalDaskExecutor(scheduler="processes")) as flow:
         mode = Parameter(name='mode', default=None)
         with case(mode, None):
              data_a=task_a
         with case(mode, 'onboard'):
              data_b=task_b
         
         data_c=merge(data_a, data_b)
         task_c
         task_d
          
         with case(mode, None):
              task_x
         with case(mode, 'onboard'):
              task_y
    Tasks a and b retrieve some data (each one is relevant for a different data source), task c and d are common (not in a “case”) - doing X on the data (the data looks the same at this point) and then again tasks x and y are different - each one is relevant for a different case. When running locally (mac, flow,run…) it all works as expected. When running on Prefect cloud - all of the tasks gets skipped (exactly the same code and credentials) Any idea on what I’m missing here? I’m using “upstream_tasks” in order to run the tasks in a specific order when necessary. Thanks
    Dekel R
    Kevin Kho
    5 replies
    Copy to Clipboard
  • kevin

    kevin

    6 months ago
    hey guys I have a flow in prefect cloud that’s showing this error
    {'_schema': 'Invalid data type: None'}
    after it is scheduled but before the first task gets executed. When I try to run this flow in a local environment it executes as expected. Any idea what could be causing this issue?
    kevin
    Kevin Kho
    15 replies
    Copy to Clipboard
  • Michael Smith

    Michael Smith

    6 months ago
    Hello, I am looking into Prefect 2.0 and am wondering how we might implement a cleanup function - in prefect 1 from what I have seen we could use triggers "All Finished" when defining the task - I cant see the equivalent in V2, any suggestions?
    Michael Smith
    Andrew Huang
    +1
    14 replies
    Copy to Clipboard
  • Myles Steinhauser

    Myles Steinhauser

    6 months ago
    I stumbled into the ECS Capacity Provider issue (PR just recently fixed https://github.com/PrefectHQ/prefect/pull/5411). I am curious when this might be released for Prefect 1.0 agents. This isn’t entirely a blocking issue for my team, but it is forcing us to workaround it via instance type selection via other methods.
    Myles Steinhauser
    1 replies
    Copy to Clipboard
  • Ken Nguyen

    Ken Nguyen

    6 months ago
    Hi! One of my flows uses GQL to pull logs from another flow. The log pulling flow is directly triggered by the flow that produces the log to be pulled. I continue to get this error for the log pulling flow:
    {'errors': [{'path': ['flow_run'], 'message': 'Operation timed out', 'extensions': {'code': 'API_ERROR'}}], 'data': None}
    When re-running a flow at a later time, it runs successfully. Are there any docs that can provide info on API limits?
    Ken Nguyen
    Anna Geller
    8 replies
    Copy to Clipboard
  • Harry Baker

    Harry Baker

    6 months ago
    is there a write up/document about specifically what's new/different in prefect 2.0? dont really want to read through all of the docs from scratch, but a summary of what the changes are would be nice
    Harry Baker
    Kevin Kho
    +1
    4 replies
    Copy to Clipboard
  • Michael Smith

    Michael Smith

    6 months ago
    Hello, my prefect 2.0 tests are going well...I see that Flow has a timeout_seconds parameter, is there any way to set Task level timeouts as well?
    Michael Smith
    Kevin Kho
    +1
    6 replies
    Copy to Clipboard
  • Myles Steinhauser

    Myles Steinhauser

    6 months ago
    Does Prefect 1.1 support retries on
    Flows
    ? Specifically, I’m trying to workaround some delayed scaling issues with ECS using EC2 instances (not ECS with Fargate tasks) Often, this failure is reported back to Prefect like the following error until Capacity Provider scaling has caught up again:
    FAIL signal raised: FAIL('a4f09101-0577-41ce-b8b0-31b84f26d855 finished in state <Failed: "Failed to start task for flow run a4f09101-0577-41ce-b8b0-31b84f26d855. Failures: [{\'arn\': \'arn:aws:ecs:us-east-1:<redacted>:container-instance/a8bc98b7c6864874bc6d1138f758e8ea\', \'reason\': \'RESOURCE:CPU\'}]">')
    I’m using the following calls to launch the sub-flows (as part of a larger script):
    flow_a = create_flow_run(flow_name="A", project_name="myles")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    Myles Steinhauser
    Anna Geller
    14 replies
    Copy to Clipboard
  • Alex Prokop

    Alex Prokop

    6 months ago
    Hi there, I'm trying to implement some custom logic to handle retries. When a task times out or fails, I'd like to be able to spawn new tasks using the arguments passed to the original task. And if those tasks succeed, I'd like to be able to change the original task's state from failed to successful. Anyone have any advice? I've read the docs on state handlers and played around with it and haven't quite found what I'm looking for. Sample code here:
    Alex Prokop
    Kevin Kho
    8 replies
    Copy to Clipboard
  • Eric Mauser

    Eric Mauser

    6 months ago
    hey everyone, I'm trying to parellelize a flow that consists of several AirbyteConnectionTasks. Flow runs fine on the ECS Agent, but it runs each task in series. Is it possible to run the tasks in parallel when I'm generating them via a for loop, or is Prefect just running the tasks as they are generated? If so, how would this be done? Here is a toy code sample of what I'm working with
    connections = ['conn1', 'conn2',
                   'conn3']
                   
    with Flow("flow_name", run_config=RUN_CONFIG, storage=STORAGE, schedule=SCHEDULE) as flow:
        for conn_id in connections:
             flow.add_task( AirbyteConnectionTask(
                airbyte_server_host=<Airbyte host>
                airbyte_server_port=<airbyte port>,
                airbyte_api_version="v1",
                connection_id=conn_id
            )
             )
    flow.run(executor=LocalDaskExecutor)
    Eric Mauser
    Kevin Kho
    2 replies
    Copy to Clipboard