• Matthew Seligson

    Matthew Seligson

    2 months ago
    I have a running flow where a number of tasks are in a paused state. I’d like to programmatically approve all of them. What’s the best way to do that? I find that when I programmatically set all of their states to resume, I get varying behavior. Sometimes the tasks stay resumed and never get running. Any ideas why?
    Matthew Seligson
    Kevin Kho
    20 replies
    Copy to Clipboard
  • Michal Baumgartner

    Michal Baumgartner

    2 months ago
    Hi, I'm going through v2 docs and was wondering if module storage from v1 (
    prefect.storage.Module
    ) would be available in the future releases. We're running Prefect + Dask setup on k8s from a monorepo (i.e. agents and dask workers deployments run the same docker image & tag), where flows are sourced from Python modules (and registered before agents start up during a new release), therefore I'd prefer to not use any object storage for storing flows or results if possible
    Michal Baumgartner
    Michael Adkins
    +1
    7 replies
    Copy to Clipboard
  • Adam

    Adam

    2 months ago
    hey folks, I have a hopefully rather simple question also prefect noob. I’m trying to test the cacheing/restart features. For simplicity, I have 5 csv’s I’m downloading within 1 task. If the first csv is successful but then the task fails on the 2nd csv, I would like to restart the task where it left off and not need to re-download the 1st csv. From my understanding I need to leverage the results feature? I understand there’s a parameter for max retries but when retrying, is there any configuration needed to inform the task to start from the 2nd csv? Also if the max retries fails, I’d like the ability to re-start manually (presumably after fixing x bug that stopped the 2nd csv from downloading) again without having to re-download the 1st csv
    Adam
    Kyle McChesney
    +1
    6 replies
    Copy to Clipboard
  • Alex Tam

    Alex Tam

    2 months ago
    So I've been using Prefect Cloud 2.0 on my personal account for a few weeks now, and we got our company account setup earlier this week. I can't figure out to login to beta.prefect.io using the company account. I can switch between mine and the companies with normal prefect.io. Does it need to be enabled and/or is it like that because its still in beta?
    Alex Tam
    Kevin Kho
    2 replies
    Copy to Clipboard
  • redsquare

    redsquare

    2 months ago
    Is the new 2.0 release coming today?
    redsquare
    1 replies
    Copy to Clipboard
  • Ifeanyi Okwuchi

    Ifeanyi Okwuchi

    2 months ago
    Hello all, I'm having difficulties figuring out what this error means
    Unexpected error: KeyError(0)
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
        executors.prepare_upstream_states_for_mapping(
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 682, in prepare_upstream_states_for_mapping
        value = upstream_state.result[i]
    KeyError: 0
    Ifeanyi Okwuchi
    Kevin Kho
    4 replies
    Copy to Clipboard
  • p

    Paul Stark

    2 months ago
    I’m trying to run the following code snippet
    snowflake_query = SnowflakeQuery(
        max_retries=cfg.retry_max,
        retry_delay=datetime.timedelta(seconds=cfg.retry_delay_seconds),
    )
    
    sql_queries = cfg.sql_query.split(';')
    
    with Flow(
        getenv('FLOW_NAME')
    ) as flow:
        run_snowflake_query = snowflake_query(
            account=PrefectSecret("SNOWFLAKE_ACCOUNT"),
            user=PrefectSecret(f"{environ['SNOWFLAKE_USER_VAR']}"),
            password=PrefectSecret(f"{environ['SNOWFLAKE_PASSWORD_VAR']}"),
            warehouse=cfg.snowflake_warehouse,
        ).map(query=sql_queries)
    sql_queries is getting put into a list since I can loop through it….Any thoughts on what I am doing incorrectly?
    p
    Kevin Kho
    11 replies
    Copy to Clipboard
  • j

    Jake

    2 months ago
    We want to be able to turn any function into single-task flows. I’ve created a wrapper to do this (as pictured). Calling
    .run()
    on the flow that gets returned works fine, and registering the flow seems to be fine too (using k8s run). However, when we try to actually run it, we get the following error:
    Failed to load and execute flow run: FlowStorageError('An error occurred while unpickling the flow:\n  ModuleNotFoundError("No module named \'build_index\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
    Am I missing something obvious?
    j
    Kevin Kho
    7 replies
    Copy to Clipboard
  • Cole Murray

    Cole Murray

    2 months ago
    Hey Prefect community, I’m working with Orion and trying to create a deployment from the API client. I’m able to create it from the CLI, but for my use-case would prefer calling with the api client to ease of use / response parsing. It appears we haven’t updated this yet on the docs page: https://orion-docs.prefect.io/concepts/deployments/#with-the-api Playing around, I have something like this:
    from typing import List
    from uuid import uuid4
    
    from prefect.client import get_client
    from prefect.flow_runners import SubprocessFlowRunner
    from prefect.flows import Flow
    from prefect.orion.schemas.data import DataDocument
    from prefect.orion.schemas.schedules import CronSchedule
    from workflow_etl.flows.flow import hello_world
    
    def main():
        prefect_client = get_client()
    
        schedules = [] # TODO
    
        for schedule in schedules:
            flow_id = await prefect_client.create_flow(hello_world)
            prefect_client.create_deployment(flow_id=flow_id,
                                             name=schedule.id,
                                             schedule=CronSchedule(cron="0 * * * * *"),
                                             parameters={
                                                 **schedule.workflow_params
                                             },
                                             tags={
                                                 'owner_id': schedule.owner_id,
                                             },
                                             flow_runner=SubprocessFlowRunner(),
                                             flow_data=# UNKNOWN WHAT TO PUT HERE
                                             )
    I’m a bit stuck on what to put in the flow_data argument. Anyone tried this / have a link to sample?
    Cole Murray
    3 replies
    Copy to Clipboard
  • a

    Arnas Prancikevicius

    2 months ago
    Hello, a bit of an odd one, as I am not sure what I am doing wrong here. Effectively, a very simple task seems to be running forever when run via Prefect Cloud (using Local Agent, Prefect 1.0). The same flow when run on the machine that runs the local agent seems to work fine. Code:
    from prefect import task, Flow, Parameter
    
    @task
    def task_function(in_list_size=10, out_list_size=4):
    
        in_list = list(range(in_list_size))
        print(f">>> Input list: {in_list}")
        
        out_list = list(range(out_list_size))
        print(f">>> Output list: {out_list}")
        
        diff_list = [i for i in in_list if i not in out_list]
        print(f">>> Result list: {diff_list}")
        
        return diff_list
    
    
    with Flow("Test Flow") as flow:
    
        in_list_size = Parameter("input_list_size", default=10)
        out_list_size = Parameter("output_list_size", default=4)
        result = task_function(in_list_size=in_list_size, out_list_size=out_list_size)
    
    
    flow.register(project_name="tutorial")
    
    if __name__ == "__main__":
        flow.run()
    Trying to figure out where the problem is - guessing something is wrong on the Local Agent side?
    a
    Anna Geller
    +2
    12 replies
    Copy to Clipboard