• v

    Vitalik

    1 year ago
    Hi – I have a basic flow of flows, with two flows within running sequentially. Tested in Core environment, everything finished as expected. Running the same in Cloud immediately gives the error below. Interestingly, first flow actually runs and finishes successfully (visible in Runs screen of that flow). However, both flows are marked as failed in the group flow. Feels like a bug, but I could be overlooking something. Any help will be greatly appreciated! Flow of flows created as following (based on https://docs.prefect.io/core/idioms/flow-to-flow.html tutorial): flow1_trig = StartFlowRun(flow_name='flow1', project_name='prj', wait=True) flow2_trig = StartFlowRun(flow_name='flow2', project_name="prj", wait=True) with Flow("group") as flow_grp:    flow2_trig.set_upstream(flow1_trig) Error: Unexpected error: ClientError([{'path': ['user'], 'message': 'field "user" not found in type: 'query_root'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: 'query_root''}}}]) Traceback (most recent call last): File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 856, in get_task_run_state value = prefect.utilities.executors.run_task_with_timeout( File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 449, in method return run_method(self, *args, **kwargs) File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 172, in run run_link = client.get_cloud_url("flow-run", flow_run_id) File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/client/client.py", line 887, in get_cloud_url tenant_slug = self.get_default_tenant_slug(as_user=as_user and using_cloud_api) File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/client/client.py", line 920, in get_default_tenant_slug res = self.graphql(query) File "/home/bitnami/anaconda3/lib/python3.8/site-packages/prefect/client/client.py", line 318, in graphql raise ClientError(result["errors"]) prefect.utilities.exceptions.ClientError: [{'path': ['user'], 'message': 'field "user" not found in type: 'query_root'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: 'query_root''}}}]
    v
    haf
    +1
    4 replies
    Copy to Clipboard
  • l

    liren zhang

    1 year ago
    Hi all, I have written a sample flow which retrieve flow code from github and execute in a docker agent lives on an EC2 instance. From the log, it is showing that code was retrieved correctly from GitHub but failed execution and it was giving a weird error.
    Failed to load and execute Flow's environment: NameError("name 'Local' is not defined")
    I am not entirely sure where the name normal is from Here is my sample code for reference:
    from prefect import task, Flow
    from prefect.run_configs import DockerRun
    from prefect.storage.github import GitHub
    
    @task
    def say_hello():
        print("Hello, world!")
    
    with Flow(name="My first flow with Docker agent", storage=GitHub(repo="bbbb/aaaa-bdp", path="/PREFECT/hello_world_github.py", access_token_secret="GIT_ACCESS_TOKEN")) as flow:
        say_hello=say_hello()
    flow.run_config=DockerRun(labels=['<http://prefect.aaaa.com|prefect.aaaa.com>'])
    
    #flow.run()
    flow.register("first_prefect_project")
    l
    Jim Crist-Harif
    8 replies
    Copy to Clipboard
  • Adam Brusselback

    Adam Brusselback

    1 year ago
    Just wondering if anyone has used Prefect with Hashicorp Vault before? Any tricks to share?
    Adam Brusselback
    Dylan
    +1
    9 replies
    Copy to Clipboard
  • Samuel Hinton

    Samuel Hinton

    1 year ago
    Hi team! Given that theres no support at the moment for flow concurrency in prefect-server, Im trying to make sure that a large influx of late tasks dont swamp our server and bring it down.  Is there a way to automatically cancel tasks that are more than a timedelta late?  For example, if a task is 1 minute late, go ahead and run it. But if its 10 hours late, cancel it.
    Samuel Hinton
    Jim Crist-Harif
    5 replies
    Copy to Clipboard
  • Steve Aby

    Steve Aby

    1 year ago
    Good Morning. I have a newbie and general and maybe best practice question that I am looking for opinions on. I have two main processes - process A and process B. Process B is dependent on process A completing but each process is very distinct. In prefect, is it best to 1): put all the tasks from both processes in a single Flow and just run sequentially; or 2) have two separate flows with a dependency between the the flows; or 3) does it matter? Thanks and just looking for viewpoints
    Steve Aby
    Jim Crist-Harif
    2 replies
    Copy to Clipboard
  • a

    Aaron Goodrich

    1 year ago
    Hey guys pretty new to prefect, probably an easy solution here, but I'm trying to move to prefect from our current system of standalone dockerized ETL integrations.   Normally I use docker compose and mount the directory to the image so I can get any csv's I generate back out of the container.   In prefect, I see I can throw my script into an image at runtime with
    flow.storage = Docker(
        path="my_flow.py",
        files={"/source/of/my_flow.py": "my_flow.py"},
        stored_as_script=True
    )
    but I don't see how I can get, say, my dynamically generated files back out.   Any suggestions?
    a
    Jim Crist-Harif
    2 replies
    Copy to Clipboard
  • ale

    ale

    1 year ago
    Hey folks 🙂 Is there a way to enrich a task state with additional variables? I know this can be done when using signals, but I would like to add some custom variables to a task state even without using signals. The main reason for doing this is to consume the variables in state handlers
    ale
    Jim Crist-Harif
    5 replies
    Copy to Clipboard
  • c

    Charles Liu

    1 year ago
    Hi! Can
    flow.run_config = KubernetesRun(image="example/image-name:with-tag")
    be a remote image in a private repo?
    c
    Jim Crist-Harif
    7 replies
    Copy to Clipboard
  • m

    Matthew Blau

    1 year ago
    Hello all, currently I have a docker image that I have a flow run, and it works well, however, I was hoping to learn if it is possible that from within that image I can at a minimum expose logging information to the Prefect UI as well as possibly having tasks contained in that image that my Flow that is external to that image can pick up and run. Thank you in advance!
    m
    Michael Adkins
    6 replies
    Copy to Clipboard
  • m

    Maria

    1 year ago
    Hi all, I am a bit unsure of how should I deal with Late Runs (eg when my agent was down for some time and now just deploying flow runs one after another to catch up ignoring schedule). It is a test scenario and tasks are small and don't change target state, but in real world I usually don't want to "catch up" - in ETL use case if I missed 5 hourly jobs, I would process those 5 hours of data at once rather than sending 5 separate jobs for execution. Should I design flows which take care of this logic (eg it will still be 5 jobs, but last 4 will do nothing since I'll have checks in place) or there is another way to achieve this?
    m
    Michael Adkins
    6 replies
    Copy to Clipboard