• Alfie

    Alfie

    2 years ago
    Hi folks, is the project feature only available in Prefect Could? I cannot create project in Prefect core.
    Alfie
    nicholas
    2 replies
    Copy to Clipboard
  • Alfie

    Alfie

    2 years ago
    Hi folks, I encounter another issue: I’ve created a flow template and register multiple flow instances with different parameters. But I cannot find a way to retrieve back those parameter details from those flows. Anyone can help on it?
    Alfie
    nicholas
    +1
    7 replies
    Copy to Clipboard
  • m

    Marwan Sarieddine

    2 years ago
    Question about setting prefect environment variables if I set the environment variable prior to running the script then it works fine
    export PREFECT__LOGGING__LEVEL="ERROR"
    but if I set the environment variable from within the script - prefect doesn’t pick up on it
    import os
    
    os.environ["PREFECT__LOGGING__LEVEL"] = "ERROR"
    
    ....
    
    flow.run()
    what am I missing here ? (is there a way to set the variable programmatically from within the script? )
    m
    Chris White
    7 replies
    Copy to Clipboard
  • k

    karteekaddanki

    2 years ago
    From the docs on results https://docs.prefect.io/core/advanced_tutorials/using-results.html
    A result subclass that matches the storage backend of your prefect.config.flows.storage setting will automatically be applied to all tasks, if available; notably this is not yet supported for Docker Storage
    Does it mean I have to individually configure the results of tasks in a docker stored flow (without any global defaults) or does it mean it is currently unsupported to have any results for tasks that run in a docker environment?
    k
    a
    +1
    3 replies
    Copy to Clipboard
  • s

    Sven Teresniak

    2 years ago
    Hi. I have a distributed Dask setup (DaskExecutor). I learned that I need to persist flows (and results) to be able to survice prefect cluster restarts. I added an NFS volume to share the exact same data between Agent and Dask Worker (these two only!) and added a
    Local
    storage to the flow, pointing to a path in the NFS. Now the questions:1. is it a good idea (or necessary?) to have the flows and results accessible in the same mountpoint (local filesystem location, e.g. NFS-persisted
    ~/.prefect/flows
    ) for Agent and Dask worker? Or is it sufficient to persist the flow/result storage (make it durable between restarts), and its not needed for the agent to access the pickeld flow results? 2. What's the pro and con of
    stored_as_script=True
    for a
    Local
    storage? When do I want to set this? Also, I do not understand the
    path
    parameter for local storage? I set
    directory
    to a path on my NFS and now I see the pickled flows and results. What is path?
  • m

    Michael Ludwig

    2 years ago
    We are using a FargateAgent and currently experimenting with different memory and CPU settings for the flows it fires off. We have the issue that we can’t change the resources anymore because it fails to recognize that it needs to recreate and ECS TaskDefinition when the memory was changed. So even with setting the right values it starts the old task definition with old memory settings:
    class ECSFargateAgent:
        def __init__(self, config: PrefectAgentConfig):
            self._env = config.env
            self._agent = FargateAgent(
                labels=[f"reco-{self._env}"],
                enable_task_revisions=True,
                launch_type="FARGATE",
                taskRoleArn=config.task_role_arn,
                executionRoleArn=config.execution_role_arn,
                cluster=config.ecs_cluster_arn,
                networkConfiguration={
                    "awsvpcConfiguration": {
                        "assignPublicIp": "ENABLED",
                        "subnets": config.subnets,
                        "securityGroups": [config.security_group],
                    }
                },
                cpu="1024",  # 1 vCPU
                memory="3072",  # 3 GB
                containerDefinitions=[
                    {
                        "logConfiguration": {
                            "logDriver": "awslogs",
                            "options": {
                                "awslogs-group": config.log_group,
                                "awslogs-region": "eu-west-1",
                                "awslogs-stream-prefix": "flows",
                            },
                        },
                    }
                ],
            )
    
        def run(self):
            """Start the agent"""
            self._agent.start()
    e.g. we switched
    memory
    from 16GB to 3GB but the agent still fires off flows with 16GB. Only deleting the old task definitions manually solves this for us. Anybody seen something similiar or has a solution?
    m
    b
    2 replies
    Copy to Clipboard
  • s

    Sven Teresniak

    2 years ago
    # prefect run server -n flow2 -l
    Flow Run: <http://localhost:8080/flow-run/e82df5c1-d197-46f1-9bb1-f18f9335e0f8>
    TIMESTAMP                         LEVEL    MESSAGE
    2020-07-20T12:09:56.284609+00:00  INFO     Submitted for execution: PID: 835
    2020-07-20T12:09:57.237871+00:00  INFO   Beginning Flow run for 'flow2'
    2020-07-20T12:09:57.273447+00:00  INFO   Starting flow run.
    2020-07-20T12:09:57.273836+00:00  DEBUG  Flow 'flow2': Handling state change from Scheduled to Running
    2020-07-20T12:09:57.581024+00:00  INFO   Task 'task2': Starting task run...
    2020-07-20T12:09:57.724552+00:00  INFO   hello, task2
    2020-07-20T12:09:57.835204+00:00  INFO   Task 'task2': finished task run for task with final state: 'Success'
    2020-07-20T12:09:57.942237+00:00  INFO   Task 'flow1-runner': Starting task run...
    2020-07-20T12:09:58.145036+00:00  ERROR  Unexpected error: HTTPError('400 Client Error: Bad Request for url: <http://localhost:4200/graphql>')
                                             Traceback (most recent call last):
                                               File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
                                                 new_state = method(self, state, *args, **kwargs)
                                               File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 819, in get_task_run_state
                                                 value = timeout_handler(
                                               File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 188, in timeout_handler
                                                 return fn(*args, **kwargs)
                                               File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 445, in method
                                                 return run_method(self, *args, **kwargs)
                                               File "/usr/local/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 119, in run
                                                 flow_run_id = client.create_flow_run(
                                               File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 877, in create_flow_run
                                                 res = self.graphql(create_mutation, variables=dict(input=inputs))
                                               File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 213, in graphql
                                                 result = <http://self.post|self.post>(
                                               File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 172, in post
                                                 response = self._request(
                                               File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 328, in _request
                                                 response = self._send_request(
                                               File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 265, in _send_request
                                                 response.raise_for_status()
                                               File "/usr/local/lib/python3.8/site-packages/requests/models.py", line 941, in raise_for_status
                                                 raise HTTPError(http_error_msg, response=self)
                                             requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http://localhost:4200/graphql>
    2020-07-20T12:09:58.275483+00:00  INFO   Task 'flow1-runner': finished task run for task with final state: 'Failed'
    2020-07-20T12:09:58.302057+00:00  INFO   Flow run FAILED: some reference tasks failed.
    2020-07-20T12:09:58.302667+00:00  DEBUG  Flow 'flow2': Handling state change from Running to Failed
    During this flow run the log/stdout of Apollo (wich is listening to
    localhost:4200
    ) prints
  • s

    Sven Teresniak

    2 years ago
    I cannot start a flow run from another flow. Can you please verify my minimal example? I have two flows. This is registered as
    flow1
    :
    #!/usr/bin/env python
    # coding: utf-8
    
    import prefect
    from prefect import task, Flow
    from prefect.environments.storage.local import Local
    
    
    @task
    def task1():
        prefect.context.get("logger").info("hello, task1")
    
    
    with Flow("flow1", storage=Local(directory="/flows/.prefect/flows")) as flow:
        task1()
    
    if __name__ == "__main__":
        flow.register()
    And here is
    flow2
    :
    #!/usr/bin/env python
    # coding: utf-8
    
    import prefect
    from prefect import task, Flow
    from prefect.tasks.prefect import FlowRunTask
    from prefect.environments.storage.local import Local
    
    
    @task
    def task2():
        prefect.context.get("logger").info("hello, task2")
    
    
    with Flow("flow2", storage=Local(directory="/flows/.prefect/flows")) as flow:
        flow1_run = FlowRunTask(name="flow1-runner", flow_name="flow1", wait=True)
        task2.set_downstream(flow1_run)
    
    if __name__ == "__main__":
        flow.register()
    When I now run the
    flow2
    I get an error:
  • s

    Sven Teresniak

    2 years ago
    2020-07-20T12:09:58.139Z {"message":"Variable \"$input\" got invalid value { flow_id: \"80de1b39-fed7-47ab-a5a5-fc22ac6f87d2\", idempotency_key: \"e82df5c1-d197-46f1-9bb1-f18f9335e0f8\" }; Field \"idempotency_key\" is not defined by type create_flow_run_input.","locations":[{"line":1,"column":10}],"extensions":{"code":"INTERNAL_SERVER_ERROR"}}
    Is this a bug? Every component of my Prefect Cluster (v0.12.4) is running in K8S but its a static setup with every component is one container and everything together in one pod (so everything can communicate using localhost). I have a Dask-Worker pod running and all other flows (not calling other flows) this far are working.
    s
    m
    6 replies
    Copy to Clipboard