https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • e

    E Li

    02/02/2022, 4:08 PM
    Hi, my task is mapped to some children tasks. if some of the children tasks fail in a run, is there a way to skip the successful ones and only re-run the failed? Can this be achieved by state handlers? thanks
    k
    a
    • 3
    • 22
  • j

    Jason Motley

    02/02/2022, 5:26 PM
    If I set up a "flow of flows" orchestration and schedule it, and the flow of flows all runs before the scheduled runs for the individual flows, will the flows just run again (unnecessarily)?
    k
    • 2
    • 14
  • a

    An Hoang

    02/02/2022, 5:38 PM
    I have a
    flow_A
    with
    task1, task2... task10
    . Now I want to construct
    flow_B
    with
    task1, task2.... task 20
    and
    task11_result = task11(task10_result)
    . How do I add the tasks of
    flow_A
    as upstream tasks for
    flow_B
    . I know I can do
    create_flow_run
    , but I do not want monitoring for
    task1...task10
    in
    flow_B
    Kind of like:
    with Flow("flow_A") as flow:
        flow_A_param1 = Parameter("flow_A_param1")
        ...
        flow_A_param_n = Parameter(...)
        ...
        task1 = ....
        ...
        task10_result = task10(...)
    
    with Flow("flow_B") as flow:
        flow_A_param1 = Parameter(...)
        ...
        flow_A_param_n = Parameter(...)
        
        flow_A_result = flow_A.run(parameters = {"flow_A_param1": flow_A_param1,...., "flow_A_param_n": flow_A_param_n}) #dont want monitoring for these tasks
        task10_result = flow_A_result.result[flow_A.get_task(task10)]
        task11_result = task11(task10_result)
        ...
        task20_result = task20(...)
    where `flow_B`'s parameters are just `flow_A`'s parameters. But in the example above I always have to repeat the parameters of
    flow_A
    in
    flow_B
    and keep them both up to date with each other. Is there a better way to do this?
    k
    • 2
    • 4
  • c

    Chris Reuter

    02/02/2022, 6:00 PM
    👋 Hi everyone! We're working with Gradient Flow to collect some data on workflow orchestration use cases and features. We want to know what's important to our community as we move forward. Please consider taking this 10 question survey. All responses are anonymous, of course.
    :upvote: 2
  • d

    Devin Flake

    02/02/2022, 7:51 PM
    Is there any chance that Prefect could release a arm64 image?
    m
    k
    a
    • 4
    • 9
  • n

    Nelson Griffiths

    02/02/2022, 9:58 PM
    I am running into some unexpected behavior with extra loggers in prefect cloud. When I add my extra loggers to the config and run my flow with a local agent everything works as expected. When then switching to the DockerRun config and Bitbucket storage my extra loggers no longer get passed to the Cloud Logs. Is there something extra I need to be doing when moving from a local agent to a docker agent to get my logs to be passed through?
    k
    • 2
    • 5
  • j

    jspeis

    02/03/2022, 1:43 AM
    I'm using pylint and it seems like when I use the
    upstream_tasks
    kwarg (e.g. as in example below) I get an
    Unexpected keyword argument 'upstream_tasks' in function call (unexpected-keyword-arg)
    ... is there a way I can get pylint to recognize this is ok? (other than disabling?)
    @task
    def add_one(val):
        return val + 1
    
    with Flow("Add One") as flow:
        p = Parameter("param")
        first = add_one(p)
        second = add_one(p, upstream_tasks=[first])
    a
    • 2
    • 1
  • s

    Sumit Kumar Rai

    02/03/2022, 6:47 AM
    How can I fit in these Gitlab's sheetload script in the Prefect workflow? Is there similar example for this use case?
    a
    k
    • 3
    • 3
  • s

    Suresh R

    02/03/2022, 10:22 AM
    Hi! I want to make sure that only one run of a flow runs at a time, is there a way to restrict the paralell runs of a flow?
    a
    • 2
    • 2
  • m

    Muddassir Shaikh

    02/03/2022, 12:20 PM
    Hi, i have a task function which retries multiple times to check if a file is present and if successful starts the downstream task. This same task has different parameters going into it, the ones in orange color are working correctly (going into retries) but few of them are in yellow color (going in submitted state) and causing downstream task to fail. What is this submitted task? And why is it happening ?
    a
    • 2
    • 3
  • i

    Ievgenii Martynenko

    02/03/2022, 2:26 PM
    Hi, Maybe someone can help on how to do best options in such case 1) We have a linux docker image as Prefect Agent with mounted S3 storage acting as primary storage for Flows, configs and some resources used for tasks. 2) Each Flow is executed in separate container using DockerRun(image=prefecthq/prefect:0.15.13-python3.8, host_config = host_config). So we have isolated Flow Containers inside Agent Container :). host_config is used to mount Agents S3 into Flow runtime, so that Flow, when running can pick up configs and resources mounted to Agent. Tricky thing is how to pass connection strings inside Flow Container? 1) I thought that we can pass them as arguments of DockerRun(.... environment = environment) according to (https://docker-py.readthedocs.io/en/stable/api.html#module-docker.api.container), but we need to prepare the list and it means that it will be "plain text". Not good. 2) Another option is to define some dictionary in the Flow and replace variables in CI/CD tool during deployment, but in this case it will be stored as plain text on storage inside Flow. This might be an option in case if serialized Prefect flow can be encrypted. 3) ??? maybe some other ideas?
    k
    • 2
    • 6
  • a

    Arnaldo Russo

    02/03/2022, 2:44 PM
    Hi there !
    👋 3
    a
    k
    c
    • 4
    • 4
  • a

    Arnaldo Russo

    02/03/2022, 2:53 PM
    I have noticed that my flow got Permission denied for write in home directory. Does anyone knows about this error? File "/usr/lib/python3.9/os.py", line 225, in makedirs mkdir(name, mode) PermissionError: [Errno 13] Permission denied: '/MboiTui' I have changed the permission for my home '/MboiTui' as 777, with chmod.
    k
    • 2
    • 12
  • c

    Caleb Ejakait

    02/03/2022, 3:10 PM
    Hi there, Trying to save a csv to GCS using pandas in a Prefect cloud flow... I have set the GCP_CREDENTIALS key on prefect cloud, which I am beginning to realize is for prefect GCP tasks only, but I am getting the GCS 401 error that suggests the credentials are missing. Is there a way to pass the credentials into the pandas operation using the saved credentials on Prefect Cloud or is there another way using Prefect tasks that could work?
    k
    • 2
    • 3
  • t

    Talmaj Marinč

    02/03/2022, 3:44 PM
    I tried to get the logs of my run to analyse failed runs:
    prefect get logs -n name_of_the_run
    but they seemed to be incomplete. I get only 2636 lines, for less than an hour of the flow. The logs are there since I see them in the UI. Any suggestions?
    k
    • 2
    • 3
  • m

    Matan Drory

    02/03/2022, 5:35 PM
    Hello. We are working with
    prefect[aws]==0.15.3
    . We have a flow that generates a large number of batch jobs, then submit them, splits them to chunks of 100 and generates an AWS waiter object per 100. We have one very large jobs with over 450 batch calls. This job seem to be stuck even though it progressed until the end. When tracking the progress we can see that a few tasks are stuck as mapped even though all child tasks are done. i.e. 472 successful definitions created in
    create_ive_analysis
    then all of them were submitted and chunked in
    submit_jobs_analysis
    and then all 5
    AWSClientWait
    jobs were done (we call it with map on the chunked job ids). The parent block is still in mapped mode. Also, sometimes an
    AWSClientWait
    tasks fails and the job doesn’t fail, it just stays there (again this is with a mapped
    AWSClientWait
    ) Wait code
    wait_res = AWSClientWait(client='batch', waiter_name='JobComplete').map(waiter_kwargs=batched_waits)
    Where batched_waits is created by
    @task
    def submit_jobs_and_batch_job_awaits(jobs: List, batched_num=BATCH_CLIENT_WAIT_MAX_SIZE):
        submitted_jobs = [BatchSubmit().run(**job) for job in jobs]
        waits = []
        for i in range(0, len(submitted_jobs), batched_num):
            waits.append(
                {
                    'jobs': submitted_jobs[i : i + batched_num],
                    'WaiterConfig': {
                        'Delay': 10,
                        'MaxAttempts': 10000,
                    },
                }
            )
        return
    What could cause that?
    k
    t
    • 3
    • 16
  • c

    Carlos Paiva

    02/03/2022, 6:04 PM
    Hi guys, How can I get the working directory or file path for Docker Run with GitHub Storage? I need to import some utilities from my repo, but I keep getting import errors.
    k
    • 2
    • 15
  • p

    Payam K

    02/03/2022, 9:37 PM
    Hello, I am running an Azure DevOps pipeline with service connection set up to access my AWS account. As a pipeline task, I run a sagemaker processing job that includes a Prefect flow.
    - task: AWSShellScript@1
                  inputs:
                    # awsCredentials: 'xxxxx'
                    regionName: 'xxx'
                    scriptType: 'inline'
                    inlineScript: python3 cli.py -remote S3 -p "['a','2020-09-01', '2020-09-02']"
    I get this error:
    [2022-02-03 21:29:07+0000] ERROR - prefect.S3Result | Unexpected error while reading from S3: TypeError('expected string or bytes-like object')
    Traceback (most recent call last):
      File "/miniconda3/lib/python3.7/site-packages/prefect/engine/results/s3_result.py", line 142, in exists
        self.client.get_object(Bucket=self.bucket, Key=location.format(**kwargs))
      File "/miniconda3/lib/python3.7/site-packages/botocore/client.py", line 391, in _api_call
        return self._make_api_call(operation_name, kwargs)
      File "/miniconda3/lib/python3.7/site-packages/botocore/client.py", line 692, in _make_api_call
        api_params, operation_model, context=request_context)
      File "/miniconda3/lib/python3.7/site-packages/botocore/client.py", line 738, in _convert_to_request_dict
        api_params, operation_model, context)
      File "/miniconda3/lib/python3.7/site-packages/botocore/client.py", line 770, in _emit_api_params
        params=api_params, model=operation_model, context=context)
      File "/miniconda3/lib/python3.7/site-packages/botocore/hooks.py", line 357, in emit
        return self._emitter.emit(aliased_event_name, **kwargs)
      File "/miniconda3/lib/python3.7/site-packages/botocore/hooks.py", line 228, in emit
        return self._emit(event_name, kwargs)
      File "/miniconda3/lib/python3.7/site-packages/botocore/hooks.py", line 211, in _emit
        response = handler(**kwargs)
      File "/miniconda3/lib/python3.7/site-packages/botocore/handlers.py", line 238, in validate_bucket_name
        if not VALID_BUCKET.search(bucket) and not VALID_S3_ARN.search(bucket):
    TypeError: expected string or bytes-like object
    the same task runs good when I just run it on an Azure Devops agent:
    - task: AWSShellScript@1
                  inputs:
                    # awsCredentials: 'xxxxx'
                    regionName: 'xxx'
                    scriptType: 'inline'
                    inlineScript: python3 cli.py -local S3 -p "['a','2020-09-01', '2020-09-02']"
    has anyone had this issue before?
    k
    • 2
    • 3
  • m

    Moises Vera

    02/03/2022, 11:20 PM
    Hello, hope this would be the right channel for this question... • I already have my tasks working in my local machine, actually right now running my tasks manually, and I wanted to start deploying this tasks to a production env and prefect cloud • This is an ETL common process but with storing files locally to not waste too much RAM among tasks and because the requests I'm doing to extract data are really heavy • So, in the extract process, after I just finish a smaller task, I store the result in a local file • all of this is working in my local machine with no issue with flow.run() • But when I register the flow for prefect cloud the issues start ◦ right now I'm dealing with
    FileNotFoundError: [Errno 2] No such file or directory: 'data/file.csv'
    ◦ this in prefect cloud ◦ mine is basic config. local agent in a server ◦ my code in a directory where I want to create the file with a directory ◦ the line of code is a
    pathlib.mkdir
    sentence I'm sure there is something I dont understand about prefect, because I cannot manipulate the local filesystem in the server with prefect cloud, any ideas?
    k
    • 2
    • 10
  • f

    Farid

    02/04/2022, 12:45 AM
    Hi there, I’m trying to setup a demo flow package inspired from here, the idea is to keep Gitlab as a source of truth and as a
    STORAGE
    for the flows and have the Gitlab CI/CD register the flows to the Prefect Cloud. However, I receive an error when I try to run the dummy flow taken from the official docs :
    from prefect import task, Flow
    from prefect.storage import Gitlab
    
    
    @task(log_stdout=True)
    def hello_task():
        text = f"hello World"
        print(text)
        return text
    
    
    with Flow(
        "hello-flow",
        storage=Gitlab(
            repo="predicthq/data-engineering/prefect-server",
            path="flows/hello-world/flow_hello_flow.py",
            access_token_secret="",
        ),
    ) as flow:
        hello_task()
    The error:
    (.venv) ➜  prefect-server git:(main) ✗ python flows/hello-world/flow_hello_flow.py
    Traceback (most recent call last):
      File "/Users/farid/WorkSpace/Github_Repos/dataobs-poc/prefect-server/flows/hello-world/flow_hello_flow.py", line 2, in <module>
        from prefect.storage import Gitlab
    ImportError: cannot import name 'Gitlab' from 'prefect.storage' (/Users/farid/WorkSpace/Github_Repos/dataobs-poc/prefect-server/.venv/lib/python3.9/site-packages/prefect/storage/__init__.py)
    Looking at the source code, I don’t see anything that could cause a dependency circle. Any idea what it could be causing this? Needles to say that I have installed the gitlab package:
    pip install 'prefect[gitlab]'
    k
    m
    • 3
    • 8
  • d

    davzucky

    02/04/2022, 2:47 AM
    With Orion will we have the option to say we want this flow to run all the time ? The idea would be for that one to listen to event and start other child flow. However this flow will be in a continuous loop.The main question will be how do you release a new version cleanly as the current event loop will need to be stopped and a new instance starter
    m
    • 2
    • 5
  • c

    Christopher

    02/04/2022, 7:40 AM
    Hi, I'm looking for a workflow management solution that we can use now to replace our homebrew scripts. I've seen Orion which looks like it would tick our boxes, but I'm aware it's not ready for production use yet. So I have two questions about prefect core: 1. Is it possible to have cyclic workflows? I have a flow which transforms input records into a smaller number of output records. I'd like to then process the output records with the same flow again until it outputs no records. 2. Is it possible to have event-driven flows? We've got records coming into a queue which ideally I'd process when the queue size reaches a certain size. I have read the PINs and suspect the answer is no, but just wanted to check. I can work around it by periodically scheduling a flow that checks the size of the queue before exiting if it's too small
    a
    • 2
    • 2
  • t

    Thomas Fredriksen

    02/04/2022, 12:06 PM
    Hello everyone. We have a prefect-server instance running in our cluster, and we were wondering what would be the best way to organize the projects? Is it possible to create directory-like structure of the form
    {DOMAIN}/{PROJECT_NAME}
    (or longer) or similar? For example
    iot_devices/site-temperature-management
    a
    • 2
    • 11
  • a

    Amanda Wee

    02/04/2022, 12:23 PM
    I'm lazy to write a pull request, so I shall point out here that the link to the docs in
    README.md
    is broken: it links to
    /core/
    , which means accessing it from Github causes it to link to
    <https://github.com/PrefectHQ/prefect/blob/master/core|https://github.com/PrefectHQ/prefect/blob/master/core>
    instead of
    <https://docs.prefect.io/core/|https://docs.prefect.io/core/>
    I'd suggest linking directly to
    <https://docs.prefect.io/|https://docs.prefect.io/>
    as per the previous behaviour. @Anna Geller git blame says you made the change 😛
    🙏 2
    a
    • 2
    • 2
  • a

    Amar Eid

    02/04/2022, 12:40 PM
    Hey everyone I am Amar. I am new to Prefect and am happy to be part of the community. Looking forward to learn from all of you. 👋
    👋 4
    a
    m
    +2
    • 5
    • 5
  • k

    Konstantin

    02/04/2022, 1:26 PM
    Hello everyone, tell me how it works, I do not understand. Is there an example available? here is the link
    a
    k
    • 3
    • 13
  • p

    Patrick Alves

    02/04/2022, 2:03 PM
    Hi there, I have this task with a trigger:
    @task(trigger=any_failed)
    def alert_email(email, message):
        sendmail(email, '[Prefect] ADCheck - [FAILED]', message)
    
    with Flow("AD Check") as flow:
    
        # Parameters
        admin_email = Parameter("admin_email", default="<mailto:patrick@xyz.br|patrick@xyz.br>")
    
        # Tasks
        message = "some message"
    
        try:
            ad_conn = connect_active_directory(ad_server, ad_user, ad_pass)
    
        except Exception as e:
            message = f'Erro ao executar workflow: {e}'
    
        # If all tasks are successful
        notify_email(admin_email, message)
    
        # If any task fails
        alert_email(admin_email, message)
    When the task fails, I would like to get the Exception error and save it o message variable, but I am getting
    Trigger was "all_successful" but some of the upstream tasks failed
    @Anna Geller, any tips? I've tried to use the state_handlers, but I need to be able to change parameters of the alert function for each workflow (email, subject, etc.) So triggers seems better. But I could not get the error.
    a
    • 2
    • 8
  • a

    Andrea Nerla

    02/04/2022, 3:08 PM
    Hi folks, I'm receiving this error everytime I try to run a Prefect script. For context I'm trying to receive Slack's status notifications so I added the SLACK_WEBHOOK_URL in the config.toml and I've also opened prefect's init.py out of curiosity. I don't think I changed anything in it but I might have miscliccked something.
    File "C:\Users\andrea.nerla\AppData\Local\Programs\Python\Python39\lib\site-packages\box\box.py", line 516, in __getattr__
    value = object.__getattribute__(self, item)
    AttributeError: 'Config' object has no attribute 'datefmt'
    k
    k
    • 3
    • 54
  • m

    Matthew Seligson

    02/04/2022, 6:41 PM
    My flow A has a StartFlowRun task that runs flow B. If I’m looking at a flow run of flow A in the UI, how can I navigate to the corresponding run of flow B?
    a
    k
    • 3
    • 5
  • w

    William Grim

    02/04/2022, 8:41 PM
    Has anyone seen issues like this?
    $ prefect run -n example --watch
    Looking up flow metadata...Done
    Creating run for flow 'example'...Done
    └── Name: eccentric-groundhog
    └── UUID: bc79f3ec-5525-4be6-b327-375624abb387
    └── Labels: ['caretaker', 'input', 'output', 'prefect-agent-556bd57fdf-v74zj']
    └── Parameters: {}
    └── Context: {}
    └── URL: <http://localhost:8080/default/flow-run/bc79f3ec-5525-4be6-b327-375624abb387>
    Watching flow run execution...
    └── 20:33:26 | INFO    | Entered state <Scheduled>: Flow run scheduled.
    ── 20:33:43 | WARNING | It has been 15 seconds and your flow run has not been submitted by an agent. Agent 93e9ff4d-5fce-4b1d-ad1b-59925fd32f92 (agent) has matching labels and last queried a few seconds ago. It should deploy your flow run.
    └── 20:34:16 | WARNING | It has been 50 seconds and your flow run has not been submitted by an agent. Agent 93e9ff4d-5fce-4b1d-ad1b-59925fd32f92 (agent) has matching labels and last queried a few seconds ago. It should deploy your flow run.
    No agent is picking up any of our flows, and flow runs just stay in the "scheduled" state even though on a CLI run, it states that there is an agent with matching labels.
    k
    a
    • 3
    • 56
Powered by Linen
Title
w

William Grim

02/04/2022, 8:41 PM
Has anyone seen issues like this?
$ prefect run -n example --watch
Looking up flow metadata...Done
Creating run for flow 'example'...Done
└── Name: eccentric-groundhog
└── UUID: bc79f3ec-5525-4be6-b327-375624abb387
└── Labels: ['caretaker', 'input', 'output', 'prefect-agent-556bd57fdf-v74zj']
└── Parameters: {}
└── Context: {}
└── URL: <http://localhost:8080/default/flow-run/bc79f3ec-5525-4be6-b327-375624abb387>
Watching flow run execution...
└── 20:33:26 | INFO    | Entered state <Scheduled>: Flow run scheduled.
── 20:33:43 | WARNING | It has been 15 seconds and your flow run has not been submitted by an agent. Agent 93e9ff4d-5fce-4b1d-ad1b-59925fd32f92 (agent) has matching labels and last queried a few seconds ago. It should deploy your flow run.
└── 20:34:16 | WARNING | It has been 50 seconds and your flow run has not been submitted by an agent. Agent 93e9ff4d-5fce-4b1d-ad1b-59925fd32f92 (agent) has matching labels and last queried a few seconds ago. It should deploy your flow run.
No agent is picking up any of our flows, and flow runs just stay in the "scheduled" state even though on a CLI run, it states that there is an agent with matching labels.
k

Kevin Kho

02/04/2022, 8:44 PM
It seems like there is an agent but it’s not healthy to pick things up. Would you be able to check that agent?
a

Anna Geller

02/04/2022, 8:45 PM
You would need to have an agent that has all those labels that you specified on your flow. Or the other way around: the labels on the agent must be a superset of those set on the run_config. Alternatively, you can run it agentless using --execute:
prefect run -n example --execute --watch
w

William Grim

02/04/2022, 8:46 PM
Yeah, the
example
flow has an
input
label, and the agents have that label and more.
a

Anna Geller

02/04/2022, 8:48 PM
it looks like your flow has more labels that just input. It has: [‘caretaker’, ‘input’, ‘output’, ‘prefect-agent-556bd57fdf-v74zj’] and all those labels would need to be also set on the agent
w

William Grim

02/04/2022, 8:49 PM
You're right, I thought it just had input, but it only has the first 3. The last label must be auto-added somewhere.
So our agents have all those labels. I think the "hostname" label is auto-added to all local prefect agents?
a

Anna Geller

02/04/2022, 8:50 PM
Yes exactly, it’s added by the registration process when you use local storage.
You could not include it using:
flow = Flow("local-flow", storage=Local(add_default_labels=False))
and on the agent:
prefect agent local start --no-hostname-label
w

William Grim

02/04/2022, 8:54 PM
Ohhh. I'm going to try that. That brings me to a different question: we haven't gotten to the point of having "kubernetes agents" yet, and we want to run X local agents. If we do what you just stated there, would any of the 5 agents pick up a flow?
a

Anna Geller

02/04/2022, 8:56 PM
If you really want to run multiple local agents, then you need to assign unique labels to each of those agents and to your flow runs to manually “load-balance” the flows across agents. This page explains the issue more https://docs.prefect.io/orchestration/agents/local.html#multiple-local-agents-with-the-same-label
w

William Grim

02/04/2022, 8:56 PM
Yeah, okay, I've read that.
👍 1
I have modified that example flow with your suggestion, and now it's saying things like
── 21:01:25 | WARNING | It has been 50 seconds and your flow run has not been submitted by an agent. Found 5 healthy agents with matching labels. One of them should pick up your flow.
I do understand that we aren't load-balancing properly, but the labels do match and one agent should pick this up. This is actually a bug we only recently started encountering, and there weren't any code changes that would seem related.
Is there another way to force an agent to start picking up flow runs?
a

Anna Geller

02/04/2022, 9:04 PM
The only way to force it is to assign exactly the same labels on the agent and on the run config 🙂 can you share your exact storage + run_config and the command used to start the agent?
also the command you used to start the flow run
w

William Grim

02/04/2022, 9:08 PM
Yeah, the cmd I'm using on the agents are (and I know we aren't load balancing, but we're working on getting there 🙂 😞
/usr/local/bin/python /usr/local/bin/prefect agent local start -l company_name -l input -l output -l caretaker --show-flow-logs
The "example" flow's code is:
with Flow(
    "example",
    state_handlers=[save_activity],
    run_config=LocalRun(labels=["input", "output", "caretaker"]),
    storage=storage.Local(add_default_labels=False),
) as flow:
I added the storage param after your input.
a

Anna Geller

02/04/2022, 9:16 PM
ok, so the problem is: if you don’t add the hostname label on the storage, then you also shouldn’t set it on the agent. So your agent command should be:
prefect agent local start -l company_name -l input -l output -l caretaker --show-flow-logs --no-hostname-label
w

William Grim

02/04/2022, 9:17 PM
gotcha. but earlier, all labels were set in the flow and a superset was in the agent.
It's just very weird b/c up until a few days ago, this was all working. it could be environmental, but i can't figure it out without something in agent logs or something saying why it won't pick up a flow run
a

Anna Geller

02/04/2022, 9:18 PM
does it work now?
w

William Grim

02/04/2022, 9:18 PM
no, it does not
a

Anna Geller

02/04/2022, 9:21 PM
agent:
prefect agent local start -l company_name -l input -l output -l caretaker --show-flow-logs --no-hostname-label
flow:
with Flow(
    "example",
    state_handlers=[save_activity],
    run_config=LocalRun(labels=["input", "output", "caretaker"]),
    storage=storage.Local(add_default_labels=False),
) as flow:
and you need to make sure that you register this flow from the same machine as the agent, otherwise this won’t work because you specified that your storage is local to the agent atm. If you run agents on a remote VM, you can explore other storage options like Git storage classes or cloud storage classes
🙏 1
w

William Grim

02/04/2022, 9:21 PM
i undid the no-hostname-label stuff
is there a way to see in a log somewhere the reason a flow won't be picked up? the logs coming out of
prefect run
say explicitly that an agent should pick up the flow
a

Anna Geller

02/04/2022, 9:23 PM
can you first explain your infrastructure? where is your agent process running? where are you registering your flow? does it all run on your laptop or on some VM?
w

William Grim

02/04/2022, 9:23 PM
k8s in aws
apollo, towel, db, agent, etc are in their own pods
a

Anna Geller

02/04/2022, 9:24 PM
you can’t use a local agent on Kubernetes, this won’t work
w

William Grim

02/04/2022, 9:24 PM
it does work
it's like a pod that's a VM
a

Anna Geller

02/04/2022, 9:25 PM
it will have side effects. I would encourage you to set up a KubernetesAgent if you are running this on Kubernetes.
w

William Grim

02/04/2022, 9:25 PM
ok. but are there some logs where i can see what's going on right now?
and i do agree with what you're saying, but this is already in prod and not something that can be hotfixed in that way
we've been running in this style setup for at least a year
a

Anna Geller

02/04/2022, 9:28 PM
you should be able to see it in the UI + in the relevant pods, and if you have access to your DB, you can also query it directly - e.g. query flow runs, you can also do the same from graphql query.
w

William Grim

02/04/2022, 9:29 PM
graphql i'm not too familiar with, unfortunately 😞 i only started hearing about it when i came onto our prefect codebase. but does it just store everything in the postgres backend somewhere? i do know how to work with that
a

Anna Geller

02/04/2022, 9:29 PM
yes - flow run table
w

William Grim

02/04/2022, 9:29 PM
oh, awesome! i'll start browsing in there and see what i can find. i'd also try the UI, but it's not easily accessible from prod
👍 1
a

Anna Geller

02/04/2022, 9:31 PM
good luck. sorry if I can’t be too helpful here but it’s hard to remotely debug such infrastructure issues, especially with a non-typical setup when local agent runs on Kubernetes 😅 hope you understand. I can try to help next week if you won’t figure it out until then
w

William Grim

02/04/2022, 9:32 PM
I understand. Try not to focus on the k8s stuff 🙂 it's just a pod that runs a local agent as the cmd. but hopefully we'll have it sorted very soon
👍 1
kubernetes agents are what i was working on, but been sidetracked with some stuff
a

Anna Geller

02/04/2022, 9:33 PM
it’s possible that it’s easier to just set up a new Kubernetes agent, set up new labels on your flow to match it with this K8s agent + reregister flows than to figure out how to debug this agent
w

William Grim

02/04/2022, 9:34 PM
if we had devops, then yes, perhaps 🙂
I'm not actually a real devops person
I just play one on tv
a

Anna Geller

02/04/2022, 9:34 PM
it’s 2 commands really:
prefect agent kubernetes install > agent.yaml # adjust yaml and apply
kubectl apply -f agent.yaml
w

William Grim

02/04/2022, 9:35 PM
we've got an atypical k8 setup at that; hard to explain
anyway, i'm just not familiar enough to set that up right now
a

Anna Geller

02/04/2022, 9:35 PM
hard to help then 😄
w

William Grim

02/04/2022, 9:35 PM
word
@Anna Geller Following up. I did finally get our prefect ui tunneled properly. All the old agents were still registered, and we noticed a lot of stuff that needed to be cleaned up. Once we removed the dead agents and things, it all started working again. I don't know exactly what was happening, but we think it had something to do with that.
Now we can continue the work we were doing to get towards kubernetes agents. We have been aiming for that but have normal stuff that comes up. 🙂
a

Anna Geller

02/05/2022, 10:25 AM
Nice work! Keep us posted if you have any questions along the way while migrating to a Kubernetes agent
View count: 5