• Ievgenii Martynenko

    Ievgenii Martynenko

    5 months ago
    Hi, I'm extending MySQL Execute and Fetch tasks to override init method, while 'run' remains the same. I'm initializing tasks outsize of flow() and then just call them expecting that second task is dependent on first one. Definitely doing something wrong. get_time (line 5) is just initializing a task, not getting actual output from select query.
    fetch_task = TestMySQLFetchOneValue(connection_name='...', query="select now(6) as time_column", name="Fetch Time")
    execute_task = TestMySQLExecute(connection_name='...', name="Write Time")
    
    with Flow("Test Flow") as flow:
        get_time = fetch_task()  #fetch_task().run() works
        execute = execute_task(query="update test_run set run_time = '" + str(get_time.get('time_column')) + "'")
    
    flow.run()
    Ievgenii Martynenko
    Kevin Kho
    6 replies
    Copy to Clipboard
  • Gaurav

    Gaurav

    5 months ago
    Hello, I have successfully deployed a Kubernetes Prefect Agent on Azure Kubernetes Cluster. I am trying to run a simple flow that utilizes a LocalDaskExecutor on the AKS Virtual Nodes. For this, I am using a custom job template for the pod, because it needs some customized node selectors and tolerations that Azure publishes. the following is snippet of my job_template:
    job_template={
                "apiVersion": "batch/v1",   
                "kind": "Job",
                "spec": {
                    "template": {
                        "metadata": {
                            "labels": {
                                "execution-model": "serverless"
                            }
                        },
                        "spec": {
                            "containers": [
                                {
                                    "name": "flow"
                                }
                            ],
                            "nodeSelector": {
                                "execution-model": "serverless"
                            },
                            "tolerations": [
                                {
                                    "key": "<http://virtual-kubelet.io/provider|virtual-kubelet.io/provider>",
                                    "operator": "Exists"
                                }
                            ]
                        }
                    }
                }
    However the flow fails. When i ran kubectl get events. I notice the following output:
    Warning   ProviderCreateFailed   pod/prefect-job-XXXXX-XXXXX   ACI does not support providing args without specifying the command. Please supply both command and args to the pod spec.
    Just some more information - I also ran the same flow successfully on a alternate deployment on AWS EKS Fargate, using an AWS Kubernetes Agent. Any guidance is really appreciated 😃
    Gaurav
    Kyle McChesney
    +3
    30 replies
    Copy to Clipboard
  • c

    Caleb Ejakait

    5 months ago
    Hey guys! Quick question about the DockerRun config. I am running a docker agent on an EC2 instance and have specified a custom image that I have added my flow dependencies to. But keep getting an 404 image not found error(no such image: repo/image_name:tag). This is the same even from prefect core images for a simple test flow. Is there something I could have missed in the agent config?
    c
    Matthias
    +1
    5 replies
    Copy to Clipboard
  • Michael Smith

    Michael Smith

    5 months ago
    Morning, I am testing a crash recovery scenario with prefect 2. My workflow has a few steps, all of which do lengthy sleeps. I have an agent running on a compute engine instance (wont be our final deployment architecture but is convenient for testing). I suspended the compute engine instance mid flow run. The log in the prefect UI indicates "Crash detected!" however the TaskRun still shows as Running. After restarting the agent it looks like there is no automatic crash recovery, so in this scenario we would need to setup a flow timeout? Is there any way to resubmit a TaskRun, and do all the agents operate in this way?
    Michael Smith
    Anna Geller
    4 replies
    Copy to Clipboard
  • Henning Holgersen

    Henning Holgersen

    5 months ago
    I'm looking at a scenario where a flow could be triggered a number of times simultaneously (via the api) - I think the extreme case is around 300x within a few seconds. I don't mind the tasks queuing up, but is there a point at which something will stop working from the Prefect side of things? I need to make sure the flows will indeed run - sooner or later.
    Henning Holgersen
    Anna Geller
    3 replies
    Copy to Clipboard
  • Thomas Mignon

    Thomas Mignon

    5 months ago
    Hi guys, I'm currently facing an issue with Parameter, how can i get the value of a Parameter ?
    Thomas Mignon
    Anna Geller
    +1
    75 replies
    Copy to Clipboard
  • Noam Gal

    Noam Gal

    5 months ago
    Hey, Congrats for releasing prefect 2.0 (beta). We currently were in the middle of using prefect 1.0 and none of our pipelines were production ready so we decided to move on and adopt Prefect 2.0 orion at first. At our pipeline we were using <task>.run to call a task from other task (not sure if that was kind of a workaround), but now this feature seems to be lost. Is there any workaround for doing that on prefect 2.0 ? (calling a task from other task) Are there any guidelines for moving from prefect 1.0 to prefect 2.0? Thanks!
    Noam Gal
    Kevin Kho
    4 replies
    Copy to Clipboard
  • Shuchita Tripathi

    Shuchita Tripathi

    5 months ago
    Hi. My scenario is to create a prefect flow and then run it using an API call. I am able to run any already created flow using POST calls (screenshot1 attached). When the POST call is invoked, the function prefect_flow is called. But I am not getting idea on how to create flows by POST calls. (screenshot2 attached for create flow code without any reference to API). I tried to encapsulate the whole task and flow inside one function (temp_prefect_run, line#9), and then calling that function for POST call, but I am getting internal server error. When checking the detailed logs: if I am doing f.register(project_name), the error says ->
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000002A9EA91E100>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))
    My prefect server is not at localhost. if I am doing client.register(flow, project_name), the error says ->
    raise ValueError("This flow has no storage to build")
    ValueError: This flow has no storage to build
    What is the best way to run POST calls which can create a new flow?
    Shuchita Tripathi
    Kevin Kho
    +1
    73 replies
    Copy to Clipboard
  • p

    Prasanth Kothuri

    5 months ago
    Hello, is there an example of how I add s3 file as an attachment to prefect email task , I looked at the code and the current attachments param only support local uri , TIA
    p
    Kevin Kho
    +1
    6 replies
    Copy to Clipboard
  • Atul Anand

    Atul Anand

    5 months ago
    I have implemented distributed dask with the prefect. The task is working perfectly fine with LocalDaskExecutor but when I tried to run it in a Distributed Dask it has given an Error. I have a docker-compose of dask with 1 schdular and multiple workers. Every thing is up and works perfectly in case of local.
    Atul Anand
    Kevin Kho
    11 replies
    Copy to Clipboard