https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    Tilak Maddy

    12/06/2021, 10:54 AM
    UserWarning: A flow with the same name is already contained in storage; if you changed your Flow since the last build, you might experience unexpected issues and should re-create your storage object.
     serialized_flow = flow.serialize(build=build) # type: Any
    Flow URL: <https://cloud.prefect.io/XXX/flow/ZZZ>
     └── ID: XXXX
     └── Project: test_proj_2
     └── Labels: ['dev']
    Hey y'all I am running flows in multiple projects but all the flows have the same name and same storage (by design, i don't wish to change the name). However when I register the flow to several projects I get this warning every time I call
    flow.register(project_name)
    (with different metadata of course like schedule time, etc) on a different project. I want to know what that warning means . Since I have the flows setup in production I want to know that if I update the flow metadata for one project is it going to do the other flows dirty ?
    a
    • 2
    • 19
  • j

    Jelle Vegter

    12/06/2021, 12:11 PM
    Hey all, I'm attempting to switch from a local agent on a VM to a docker agent. When using docker-compose with image: prefecthq/prefect:latest I'm getting the following error. Does anyone have a clue what's going on here? Thanks!
    a
    k
    r
    • 4
    • 27
  • t

    Thomas Nyegaard-Signori

    12/06/2021, 2:03 PM
    Hey community! I am seeing weird connection issues on Prefect. We are running relatively large flows (~2000 tasks), all of which are
    CreateNameSpacedJob
    tasks, spawning jobs on our kubernetes cluster. We are hosting our own Prefect server using the docker-compose command directly on a moderately large VM (Standard E8ds v5 VM type from Azure). It seems as if the flow-pod, the pod responsible for orchestrating the tasks, is losing connection to the backend, specifically the
    apollo
    service as can be seen in the first screenshot. All of a sudden, all
    CreateNameSpacedJob
    would fail at the same time when the
    CloudTaskRunner
    went to update the state of the task. I did a bit of digging with
    netstat
    and it seems that there are quite a bit of TCP connections being created in the
    apollo
    container, however, I am not entirely sure if that is "business as usual" or a bit on the heavy side for this kind of setup. Have anyone else experience these kinds of hiccups or are using a similar setup that might have ideas? I dont know whether the second screenshot is of relevance but it has started to pop up quite a lot and I cant seem to figure out whats causing it.
    a
    • 2
    • 9
  • b

    brian

    12/06/2021, 3:17 PM
    I’m having an issue with prefect cloud. My agent is running and nothing looks wrong in the logs, but for some reason when I try to run a flow with the same label as the agent the flow doesn’t run because it says there are no agents with the label I’ve specified. Any debugging suggestions would be much appreciated!
    k
    • 2
    • 19
  • j

    Jason Motley

    12/06/2021, 3:30 PM
    Is it possible to create a distribution list where I show whether a given job has succeeded/failed, only sending out once per day? For example, if I have 4 jobs running in the night and want to send a 6AM email update with just 3 of those jobs, is that possible via cloud hook? Rather than showing in the moment which have succeeded/failed.
    a
    k
    • 3
    • 4
  • d

    Daniil Ponizov

    12/06/2021, 4:46 PM
    Hi! Is it possible to run flows on different servers without kubernetes or aws cluster?
    k
    v
    • 3
    • 7
  • h

    Hugo Shi

    12/06/2021, 6:05 PM
    Is it possible to configure timeouts for a flow? I know that they can be configured per task - I was hoping to configure them at the flow level
    k
    • 2
    • 3
  • h

    Horatiu Bota

    12/06/2021, 6:34 PM
    hi prefect-community! is there any way to force a(n over)write of a checkpointed/cached task result? say i have an S3Result, can i call write on it to overwrite the previously checkpointed result (something like
    s3result.write(overwrite=True)
    )?
    k
    • 2
    • 21
  • t

    Theo Platt

    12/06/2021, 7:25 PM
    Hi there. I think this is a simple question! I have some flows (generally many hours long) where the last task fails but the flow continues to run. The last task happens to be an AWSClientWait on a Batch job but I don't know if this is the reason or not. Or alternatively is there a way to fail a flow if any of the tasks fail? Thanks as always!
    k
    • 2
    • 22
  • i

    Isaac Brodsky

    12/06/2021, 8:19 PM
    Is there an updated example of using dask from within a Prefect flow? I.e. a dask that calls
    dask.compute()
    using a Dask Executor? I'm trying to compare my upgraded (0.13.x to latest) flow code with a working example and right now my
    compute
    calls hang.
    k
    • 2
    • 7
  • z

    Zane Selvans

    12/06/2021, 8:36 PM
    Hi all. We're just starting to use Prefect & Dask to coordinate our open source US energy data processing pipeline so we can do complete nightly continuous integration runs with all of the data, instead of just a small subset. Also interested in the potential for integration with Airbyte and dbt. https://github.com/catalyst-cooperative/pudl
    k
    • 2
    • 1
  • l

    Leon Kozlowski

    12/06/2021, 9:07 PM
    When trying to build my flow (docker storage) with
    prefect build
    I am running into an error
    mkdir: cannot create directory '/opt/prefect/': Permission denied
    - I am creating a user in my Dockerfile thru
    useradd
    it appears there are extra commands prefect appends to the user defined Dockerfile - is the root user required?
    a
    t
    • 3
    • 18
  • j

    James McNeilis

    12/06/2021, 9:59 PM
    Hey 👋 We just hit this internal serve error on one of our scheduled runs. We have hit restart, but wondering whether there is any further context that can be provided?
    Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'An unknown error occurred.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1917, in set_task_run_state
        result = self.graphql(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 569, in graphql
        raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'An unknown error occurred.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    I've put the task and flow runs urls in the 🧵 .
    k
    d
    • 3
    • 9
  • a

    Alvaro Durán Tovar

    12/06/2021, 10:23 PM
    hi! I can't see stdout messages from a kubernetes job in prefect cloud, I have this on my jobs
    PREFECT__CLOUD__SEND_FLOW_RUN_LOGS=true
    PREFECT__LOGGING__LOG_TO_CLOUD=true
    I'm using
    @task(log_stdout=True)
    and I can see the message in kubernetes it self. I can see the message "hey budy" in the log as you can see here, but is not present in the cloud UI am I missing something?
    k
    a
    • 3
    • 17
  • d

    Dominic Pham

    12/07/2021, 12:06 AM
    Hi all, let's say that I have a flow scheduled and running on cloud that launches another python process/program/script via a local agent and it runs on the weekends. Is there an agent that can persist on my local machine in the event that my machine restarts?
    k
    t
    • 3
    • 2
  • a

    alins

    12/07/2021, 11:35 AM
    Hi i have a problem when i define three task and add call them one by one under a flow they didn't run step by step !! how can i say to a flow to how run task (which task 1st which one 2nd and which one 3rd) ??? I'm adding a sample code as an reply to my message
    a
    • 2
    • 2
  • s

    sarika gowtham

    12/07/2021, 11:57 AM
    #dask error. Unable to reach tcp ip Timed out trying to connect to tcp://172.16.0.5:8786 after 30 s while executing
    executor = DaskExecutor(address="<tcp://127.0.0.1:8787>")
    flow.run(executor=executor)
    a
    • 2
    • 1
  • z

    Zheng Xie

    12/07/2021, 1:05 PM
    I have errors when apply the prefect_agent.yaml, please see https://stackoverflow.com/questions/70260659/prefect-kubernetes-agent-throws-authorizationerror-when-start
    a
    • 2
    • 3
  • z

    Zheng Xie

    12/07/2021, 1:05 PM
    I have tried to follow the document to do some troubleshooting, but can’t figure it out.
    a
    • 2
    • 1
  • t

    Tilak Maddy

    12/07/2021, 1:21 PM
    I want to know what exactly does the prefect agent pull from storage ? I'm asking this because in the main flow file which is
    with Flow(...) as flow:
        a = first_task()
        b = second_task()
    say there are tasks which are defined and imported from other files (which indeed call other tasks and so on..) There is no way I can look at just this main file and tell how the entire flow is gonna look like ? What kind of dependencies all the tasks will have, retries, etc, etc Basically there isn't much info I can get . So what are we doing here ? (and why?)
    a
    • 2
    • 29
  • i

    Ievgenii Martynenko

    12/07/2021, 1:24 PM
    Hi, I have a task that should be executed each 30 minutes; sometimes task is running longer than 30 minutes, and at the same time another instance of this Task is started in parallel which is causing logical and performance issues. Is there any option not to start the same Task if previous execution is not finished yet for non-cloud? (seems like for cloud it's possible using labels and number of parallel executions allows).
    a
    k
    • 3
    • 11
  • j

    Jason Motley

    12/07/2021, 3:20 PM
    Is there a Prefect cloud/UI equivalent of this? https://docs.prefect.io/core/idioms/flow-to-flow.html
    k
    • 2
    • 19
  • j

    John Shearer

    12/07/2021, 6:16 PM
    Is it expected that running a local flow with
    PREFECT__FLOWS__CHECKPOINTING=false
    but with checkpoint data present in the prefect result directory would read from those results? - I would expect this, but this is the current behaviour (on my machine ...)
    a
    k
    • 3
    • 17
  • e

    Erik Schomburg

    12/07/2021, 6:31 PM
    Hello! I’m trying to create a convert an existing script into a prefect flow, and the
    task.map
    functionality doesn’t quite work as I had expected. I’m aggregating a bunch of data sources and rows into a DataFrame, and previously this had been sped up by running it over subsets of keys, with each subset running in a different process. So in prefect, I have a task that splits the keys into subsets, and then maps a task over those subsets. The problem is that there’s a small probability of failure on each subset, due to connection timeouts. I have added some retry logic, but still want to make sure that successful sub-tasks have their results checkpointed, and unsuccessful ones are not. But the
    results = task.map(subset=subsets)
    code instead just stores the
    results
    in a single file, and then does not re-run unsuccessful sub-tasks. I tried adding
    {map_index}
    to the task target filename pattern, but this did not work (_update_: it does work, I just had extra
    {
    brackets, i.e.,
    {{map_index}}
    🤦). Here’s the basic flow:
    all_keys = get_keys_task()
    key_subsets = partition_keys_task(all_keys, n_subsets)
    data_subsets = get_data_task.map(keys=key_subsets)
    all_data = concatenate_subsets_task(data_subsets=data_subsets)
    I know I can work around this by writing my own utility to create a list of tasks with their own unique task names, but it seems like part of the point of
    .map
    ought to be to do this sort of results management for you... Any tips? Maybe there’s just a parameter in the
    prefect.task
    decorator or the
    task.map
    function I don’t know about?
    k
    • 2
    • 9
  • t

    Tony Yun

    12/07/2021, 7:49 PM
    Hello, we’ve recently noticed that the exception logs in Production are printing out personal home folder path. Do you know how this is happened?
    @task(log_stdout=True)
    def test():
        print(f'current folder: {Path.cwd()}')
        raise Exception('test exception')
    
    Task 'test': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state
        logger=self.logger,
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/Users/tonyyun/github/data_operations/dbt_automation/flow.py", line 296, in test
    Exception: test exception
    , where
    /Users/tonyyun/github
    shouldn’t be there^
    k
    a
    • 3
    • 9
  • j

    Jeffery Newburn

    12/07/2021, 8:34 PM
    We are using prefect cloud. We are curious how to get audit logs out of the server to push them to our monitoring system? Is there something we can use that comes with the system for exporting. Is there any documentation on this?
    k
    b
    • 3
    • 11
  • b

    bitsofinfo

    12/07/2021, 8:56 PM
    hi - new to prefect, just reading the docs and see the section on "flow storage", i guess Im trying to follow the developer workflow here... so I code a python file that defines my flow and its tasks... submit this to prefect, and it uses flow storage to actually store that script? (i,e. vs me coding a flow, checking it into git, then deploying my flows to prefect..) I guess Im not sure I'm following this.
    k
    • 2
    • 13
  • j

    Jason Motley

    12/07/2021, 9:43 PM
    Is there a way to set up an if/else statement within a flow depending on if a SQL query errors out? For example,
    if(select * from table) = error then (alter table add column) else somethingelse
    ?
    k
    • 2
    • 8
  • b

    Billy McMonagle

    12/07/2021, 10:45 PM
    Hi there! I hope today's AWS outages have not been too stressful for the team. Due to issues in us-east-1, my prefect agents are down, and a number of flows were marked as "late". I canceled several of them manually, and then clicked "Clear late runs" in the UI. Can you clarify what this actually does?
    k
    • 2
    • 5
  • e

    Emma Rizzi

    12/08/2021, 8:11 AM
    Hello, question on Agents to understand the doc here : https://docs.prefect.io/orchestration/agents/overview.html
    A single agent can manage many concurrent flow runs
    -> does it mean that if I schedule multiple flow at the same time for the same agent, the agent will be able to treat them parallelly ? Or does it queue the jobs ? Thanks
    a
    k
    • 3
    • 4
Powered by Linen
Title
e

Emma Rizzi

12/08/2021, 8:11 AM
Hello, question on Agents to understand the doc here : https://docs.prefect.io/orchestration/agents/overview.html
A single agent can manage many concurrent flow runs
-> does it mean that if I schedule multiple flow at the same time for the same agent, the agent will be able to treat them parallelly ? Or does it queue the jobs ? Thanks
a

Amanda Wee

12/08/2021, 8:57 AM
Yeah, the agent does handle them in parallel.
:upvote: 1
e

Emma Rizzi

12/08/2021, 2:08 PM
@Amanda Wee thanks, do you have any idea of how to optimize the performances ? if i put the agent on a 4 cpus instance, does it need configuration to use them all ?
k

Kevin Kho

12/08/2021, 2:13 PM
It just keeps launching the processes with no configuration
e

Emma Rizzi

12/10/2021, 10:26 AM
thanks @Kevin Kho I'll try this soon!
View count: 3