<@ULVA73B9P> how do I get the status of a flow, i....
# marvin-in-the-wild
m
@Marvin how do I get the status of a flow, i.e. if it is running, scheduled, etc. I am trying to build a flow that will run another flow if a condition is met, however it should not run the flow if the flow is alreaady running
m
To check the status of a flow run to determine if it's running, scheduled, etc. and conditionally trigger another flow, it seems you will need to understand flow run states and possibly use automations in Prefect. Here are some educational materials I gathered up for you: - States in Prefect provide detailed information about the status of a flow or a task run. You can learn if a task is scheduled to run, has succeeded, or failed among other statuses. States Documentation - Automations can be used to trigger events based on certain conditions, such as a flow not completing within a certain time after it has started running. Automations Documentation For running another flow conditionally, unfortunately, the code example I found is just a basic flow definition and does not cover this scenario. However, you can implement this by checking the current state of a flow run before triggering a new one within your custom flow logic. Below is a very simplified example conceptually. Example concept (in Python Pseudocode):
Copy code
python
from prefect import flow
from prefect.client import get_client
from prefect.orion.schemas.states import StateType

@flow
def conditional_trigger_flow(target_flow_name: str):
    with get_client() as client:
        # Retrieve the most recent state of the target flow
        # Note: This call is conceptual; use the appropriate client method to retrieve states.
        flow_run_state = client.get_latest_flow_run_state(target_flow_name=target_flow_name)
        
        # Only trigger target flow if it is not currently running
        if flow_run_state.state_type != StateType.RUNNING:
            # Trigger the target flow
            # Note: This is also conceptual; replace with an actual function to trigger flows.
            client.trigger_flow_run(target_flow_name=target_flow_name)

if __name__ == "__main__":
    conditional_trigger_flow("my-target-flow")
Remember this is a hypothetical code to demonstrate the concept. You'll need to use the Orion API Client methods appropriately to retrieve the flow state and trigger the flow run, as well as handle the required asynchronous calls. Please refer to Prefect documentation for exact code samples and methods to achieve conditional triggering based on the flow run status.
m
@Marvin can I build a trigger that will start a flow if a new file has been uploaded to a ftp server?
m
To trigger a Prefect flow when a new file is uploaded to an FTP server, you will likely need to create a custom solution that monitors the FTP server and triggers the flow using Prefect automations or the Prefect API. The Prefect documentation suggests using triggers and automations for event-based flow runs, but does not provide a direct example for FTP server file uploads. - You can set up an automation that responds to file write events (e.g., using
prefect.block.remote-file-system.write_path.called
), as shown in the Automations documentation. However, this might not directly apply to FTP servers. - Alternatively, you can consider using the Airbyte integration if Airbyte supports FTP servers as a data source. Airbyte is a tool for running ETL (extract, transform, load) syncs, and Prefect provides functionality to trigger flows based on Airbyte sync results. - Deployment triggers and automations for flow runs can be configured in Prefect, which may be leveraged to monitor events, including potentially those from external systems like FTP servers. Deployment and Block-Based Documentation For a custom FTP monitoring solution, you might set up a small script that runs periodically, checking for new file uploads on the FTP server, and when it detects a new file, it uses the Prefect API to trigger a flow run. Here's a conceptual outline of how you might implement this:
Copy code
python
from ftplib import FTP
from prefect import flow

# This could be a local or cloud URL, replace with your specific target Prefect instance
PREFECT_API_URL = '<http://localhost:4200>'

@flow
def trigger_flow_if_new_file_detected(ftp_server, username, password, path):
    with FTP(ftp_server) as ftp:
        ftp.login(user=username, passwd=password)
        ftp.cwd(path)
        files = ftp.nlst()
        
        # Custom method to determine if there's a new file
        # This could compare the current list with a cached list from the previous check
        if new_file_detected(files):
            # Trigger the flow using Prefect API or built-in Prefect triggers/automations
            trigger_prefect_flow(PREFECT_API_URL, flow_name)

if __name__ == "__main__":
    trigger_flow_if_new_file_detected('<http://ftp.example.com|ftp.example.com>', 'user', 'pass', '/path/to/files')
Remember, you will need to define
new_file_detected
and
trigger_prefect_flow
functions with actual logic and make sure you handle the Prefect API requests properly with authentication. These suggestions should get you started with setting up a trigger based on FTP file uploads. Please consult the Prefect documentation and any specific packages or tools you decide to use for more detailed instructions.