• Tobias Heintz

    Tobias Heintz

    1 year ago
    Another conceptual question: how tightly is Prefect bound to Dask? It appears (in the docs) that some critical features are only available when using Dask: for example Task parallelism and transparent data flow between Tasks (which may be running on different machines). We are planning to run everything on ECS, will we still be able to use these features? Thanks a lot!
    Tobias Heintz
    Samuel Hinton
    +1
    3 replies
    Copy to Clipboard
  • Braun Reyes

    Braun Reyes

    1 year ago
    Hey there everyone. Curious if it is possible to map over a section of task dependencies. Like I have an E->L set of tasks that i want to map over a list of tables. Is that possible. It seems with map and apply_map it will still map over all the E's and then move on to mapping all the L's. I support this would be like mapping over a nested flow, but not a flow that is actually registered, but resides inside a registered flow. Was testing with this
    from time import sleep
    
    from prefect import Flow, apply_map, task
    from prefect.executors import DaskExecutor
    
    
    @task()
    def test_1(x):
        sleep(2)
        print(f"test_1 with {x}")
        return x
    
    
    @task()
    def test_2(x):
        sleep(2)
        print(f"test_2 with {x}")
    
    
    def micro_flow(x):
        test_1_task = test_1(x)
        test_2(test_1_task)
    
    
    with Flow(
        "example",
        executor=DaskExecutor(cluster_kwargs={"n_workers": 1, "threads_per_worker": 2}),
    ) as flow:
        apply_map(micro_flow, range(10))
    
    
    if __name__ == "__main__":
        flow.run()
        # flow.visualize()
    Braun Reyes
    Jeremiah
    11 replies
    Copy to Clipboard
  • rafaqat ali

    rafaqat ali

    1 year ago
    Hi I've been searching prefect deployment mechanism on azure web app for containers, but couldn't find any help. I have concluded that prefect is not suitable for me as I have to deploy on my azure infra. I have created my own docker compose file but apollo url is still pointing to localhost. Can anyone kindly guide me, how can I achieve this one? or I should look for other option like dagster?
    rafaqat ali
    Jenny
    2 replies
    Copy to Clipboard
  • Daniel Black

    Daniel Black

    1 year ago
    Hello. I am getting a weird error when my task runs on schedule. All the task is doing is executing a query in a postgres db. It is scheduled to run daily but fails because "the table does not exist". I know it exists and if I restart the same task from the Prefect Cloud UI it works just fine. Our deployment is using a fargate task to run a docker container. Any ideas as to why this error would occur?
    Daniel Black
    Jenny
    +1
    16 replies
    Copy to Clipboard
  • Pedro Machado

    Pedro Machado

    1 year ago
    Hi there. I am working on a project where we might use kubernetes to execute flows. I don't have much experience with Kubernetes, but I've used the
    KubernetesPodOperator
    in Airflow and one thing I noticed is that it that you can't see the logs until the task ends. How does logging work in Prefect when you use kubernetes? Are logs streamed to prefect cloud in real time?
    Pedro Machado
    Dylan
    3 replies
    Copy to Clipboard
  • Asif Imran

    Asif Imran

    1 year ago
    Good morning! 👋 I am curious what is the 😛refect: equivalent of
    Sensors
    from airflow. I see this helpful link[1] from Jeremiah -- essentially do the poke-sleep-poke yourself iiuc. Any changes since his post? Recently AF introduced the notion of SmartSensors[2] which removes a fair bit of work duplication (its fairly typical for me to have several workflows all polling on the same S3 success file). I have similar worries that such polling will hog up resources (e.g going over the resources in my ecs cluster) [1] https://prefect-community.slack.com/archives/CL09KU1K7/p1602088074097600?thread_ts=1602087747.097500&cid=CL09KU1K7 [2] https://airflow.apache.org/docs/apache-airflow/stable/smart-sensor.html
    Asif Imran
    Jenny
    3 replies
    Copy to Clipboard
  • Ajith Kumara Beragala Acharige Lal

    Ajith Kumara Beragala Acharige Lal

    1 year ago
    Hi Prefect Experts, am facing an issue when invoking a script in
    GITLab repo
    in my prefect-server, can someone help me to figure-out what is the mistake in my code? the error
    Failed to load and execute Flow's environment: GitlabGetError('404 Project Not Found')
    Ajith Kumara Beragala Acharige Lal
    Jim Crist-Harif
    90 replies
    Copy to Clipboard
  • Diego Alonso Roque Montoya

    Diego Alonso Roque Montoya

    1 year ago
    Hi! Is there a way to send logs from a distinct machine into prefect? I have tasks that spawn computers so it would be nice if those child computers can log back to the main task
  • S K

    S K

    1 year ago
    Need help here. Trying to do in python as below. This is to check the flow state and cancel in the flow is in running state. How to pass the values to "$flowRunId: UUID!"                                                                                                                        import prefect
    from prefect import Client
    from prefect import task, Flow
    @task()
    def check_runs():
      c = Client()
      query = """
      query RunningFlowsName {
      flow(where: {name: {_eq: "flowstatechecktest"}}) {
        id
      } }  """
      print('======')
      print(c.graphql(query=query))
    
      query2 = """
      query RunningFlowsState {
      flow_run(where: {state: {_eq: "Running"}}) {
        state
      }  }  """
      print('======')
      print(c.graphql(query=query2))
    
      query3 = """
      mutation CancelFlowRun($flowRunId: UUID!) {
      cancel_flow_run(input: {flow_run_id: $flowRunId}) {
        state
      }  }  """
      c.graphql(query=query3)
    
    with Flow("flowstatechecktest") as flow:
        check_runs()
    flow.run()
    S K
    1 replies
    Copy to Clipboard
  • m

    matta

    1 year ago
    Heya! So, I'm pulling from a database that goes down for ~75 minutes at random times. I set my tasks to have
    @task(max_retries=3, retry_delay=timedelta(minutes=30))
    but apparently Zombie Killer doesn't like that? Looking through the logs, I see
    No heartbeat detected from the remote task; marking the run as failed.
    , then `Flow run is no longer in a running state; the current state is: <Failed: "Some reference tasks failed.">`` then
    Heartbeat process died with exit code -9
    then
    Failed to set task state with error: ClientError([{'message': 'State update failed for task run ID 43f52f19-fffb-4d16-8223-da4ffc5668b2: provided a running state but associated flow run 8c8fc810-eb3d-447c-ab70-76dd1dc2acaa is not in a running state.', 'locations': [{'line': 2, 'column': 5}], 'path': ['set_task_run_states'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'State update failed for task run ID 43f52f19-fffb-4d16-8223-da4ffc5668b2: provided a running state but associated flow run 8c8fc810-eb3d-447c-ab70-76dd1dc2acaa is not in a running state.'}}}],)
    m
    Michael Adkins
    +1
    7 replies
    Copy to Clipboard