• c

    Chohang Ng

    1 year ago
    flow_1_flow = StartFlowRun(flow_name='flow_1',project_name='tester',wait = True)
    flow_2_flow = StartFlowRun(flow_name='flow_2',project_name='tester',wait = True)
    flow_3_flow = StartFlowRun(flow_name='flow_3',project_name='tester',wait = True)
    flow_4_flow = StartFlowRun(flow_name='flow_4',project_name='tester',wait = True)
    flow_5_flow = StartFlowRun(flow_name='flow_5',project_name='tester',wait = True)
    
    
    with Flow("main-flow", schedule=weekday_schedule,executor=LocalExecutor(), 
               run_config=LocalRun()) as flow:
        flow_3 = flow_3_flow()
        flow_1_flow().set_upstream(flow_2_flow())
        step_2 = flow_4_flow(upstream_tasks = [flow_1_flow(),flow_3])
        step_3 = flow_5_flow(upstream_tasks= [step_2])
        
    flow.register(project_name='tester')
  • c

    Chohang Ng

    1 year ago
    I am confused how the upstream_tasks work. Here I want to execute them in this order (2>(1,3)>4>5). Does this look right? For some reasons, only flow 1,2 got executed
    c
    Chris White
    2 replies
    Copy to Clipboard
  • c

    Chohang Ng

    1 year ago
    if I include LocalAgent().start() under where my flow is registered in code, will the cloud starts up a agent to look for flows when it is scheduled to run?
    c
    nicholas
    3 replies
    Copy to Clipboard
  • Ismail Cenik

    Ismail Cenik

    1 year ago
    Hello, our flows have been successfully running for 1 month and made more than 200 successful runs. A new error has just started yesterday. For the last 6 runs, half of them were not completed and stayed in one of our tasks. There is no log information. According to the task logic, the Kinesis Data Analytics must be checked per 60 seconds. After a while, just waiting without doing anything. There is still an active flow staying in that condition, and I have not killed it to show you. Could you please check it?
    Ismail Cenik
    Kevin Kho
    75 replies
    Copy to Clipboard
  • c

    Chohang Ng

    1 year ago
    Here the Drop_tmp was executed after the create tmp and thus the dependency was lost. Drop_tmp is supposed to execute last according to my flow. What am I missing? If I don't set upstream and downstream, it might execute out of order? But when I look at other flows, most of them are still fine.
    c
    Kevin Kho
    +1
    12 replies
    Copy to Clipboard
  • c

    Chohang Ng

    1 year ago
    Does the LocalDaskExecutor use all the cores of the local computer where the flows are registered? So it will find all the independent flows and execute them parallelly? vs LocalExecutor just uses one. Correct?
    c
    1 replies
    Copy to Clipboard
  • d

    Daniel Davee

    1 year ago
    I'm using a dask executer and kubernetes run, to run a flow on a kubenetes cluster. But it doesn’t seem to want to import my sub modules when it runs on the cluster. Do the I need to include them on the worker or is there a way they can be uploaded with flow?
    d
    Kevin Kho
    +1
    27 replies
    Copy to Clipboard
  • Joël Luijmes

    Joël Luijmes

    1 year ago
    Hey there! Is the newly introduced KV Store also coming to Prefect Server? Or will this be cloud-only feature?
    Joël Luijmes
    s
    +1
    11 replies
    Copy to Clipboard
  • j

    Johan Wåhlin

    1 year ago
    Does anyone have experience with registering prefect flows from azure devops pipelines to a prefect server? The command: 'prefect register flow --file flows/test_flow.py --project batteriskip --skip-if-flow-metadata-unchanged' keeps getting the error "requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200)" despite setting environment variable PREFECT__SERVER__ENDPOINT="http://myserverip:myport'" Same issue when using variables PREFECT__SERVER__HOST and PREFECT__SERVER__POST.
    j
    nicholas
    2 replies
    Copy to Clipboard
  • l

    ll

    1 year ago
    Are there any plans to add another type of executor for distributed environments? Our use case requires a large batch of tasks. Each task just makes an invocation of a C++ executable in an embarrassingly parallel fashion. i.e. If done serially by hand:
    # task 1
    ./my_cpp_executable <http://file1.xyz|file1.xyz>
    
    ...
    
    # task 1000
    ./my_cpp_executable <http://file1000.xyz|file1000.xyz>
    Each task takes about 4-8 compute hours on 4 CPUs/32G~ memory and our scheduled workloads take up about 20,000-40,000+ compute hours per day. From what I can tell the only supported strategy for running a large batch of embarrassingly parallel tasks right now is to use Dask. We have it working but I feel Dask is more oriented to (i) interactive analysis workloads, (ii) pure Python tasks, (iii) small jobs that fit onto local disk for each Dask node. Feels awkward to invoke a Dask executor for a one-line shell execution for a high-throughput, long-running, queued (num_tasks >> num_cluster_nodes) workload. We prefer not to have to support Dask on our infrastructure as it adds a whole other set of things that our sysengs have to maintain. Seems more suitable if you supported any job queueing systems typically found in HPC environments like SGE, Slurm or HTCondor. I figure many of your target users in the fintech, scientific computing, meteorological space will already have SGE or Mesos cluster set up in their environment, but not a Dask cluster.
    l
    s
    11 replies
    Copy to Clipboard