https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • f

    Florian Guily

    03/23/2022, 10:39 AM
    Hi, i'm just getting into logging. Is there more log type than info and warning ? if so where can i find this list ?
    a
    • 2
    • 7
  • n

    Nelson Griffiths

    03/23/2022, 1:20 PM
    I am running a prefect agent with Orion right now with a deployed flow. The agent runs flows just fine if I start it and go hit quick run in the UI. But if I leave the agent sitting for too long I start getting this 403 error:
    Traceback (most recent call last):
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/cli/base.py", line 59, in wrapper
        return fn(*args, **kwargs)
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 120, in wrapper
        return run_async_in_new_loop(async_fn, *args, **kwargs)
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 67, in run_async_in_new_loop
        return anyio.run(partial(__fn, *args, **kwargs))
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 56, in run
        return asynclib.run(func, *args, **backend_options)
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 233, in run
        return native_run(wrapper(), debug=debug)
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/asyncio/runners.py", line 44, in run
        return loop.run_until_complete(main)
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
        return future.result()
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 228, in wrapper
        return await func(*args)
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/cli/agent.py", line 71, in start
        await agent.get_and_submit_flow_runs()
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/agent.py", line 88, in get_and_submit_flow_runs
        submittable_runs = await self.client.get_runs_in_work_queue(
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/client.py", line 747, in get_runs_in_work_queue
        response = await <http://self._client.post|self._client.post>(
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/utilities/httpx.py", line 137, in post
        return await self.request(
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/prefect/utilities/httpx.py", line 80, in request
        response.raise_for_status()
      File "/home/nelson/miniconda3/envs/my_project/lib/python3.9/site-packages/httpx/_models.py", line 1510, in raise_for_status
        raise HTTPStatusError(message, request=request, response=self)
    httpx.HTTPStatusError: Client error '403 Forbidden' for url '<https://api-beta.prefect.io/api/accounts/df4b7089-cc2a-48ae-b4ce-baea44b163d6/workspaces/b22af91f-f810-4bc3-ac90-a1fa0e042c55/work_queues/c91e1439-be7e-4a98-8df0-da39515197b2/get_runs>'
    For more information check: <https://httpstatuses.com/403>
    An exception occurred.
    Any ideas what might be causing this?
    k
    a
    • 3
    • 28
  • s

    Shrikkanth

    03/23/2022, 2:16 PM
    Hi guys , Is there a Query to execute or run a prefect flow from prefect Interactive API?
    k
    • 2
    • 4
  • c

    Chris Reuter

    03/23/2022, 2:33 PM
    Hey all! Join @Kevin Kho and myself on PrefectLive 📺 today at 3p Eastern. He'll be running through some data validation workflows, and I'll be distracting Kevin. See 👀 you in chat!
    :upvote: 2
    :marvin: 1
    :marvin-duck: 2
  • k

    Kyle Austin

    03/23/2022, 2:34 PM
    Our team wrote a flow state handler that changes the new_state of the flow to Finished('message'). For some reason this causes our flows to look like they are running ad infinitum. For example The flows We ran a week ago show run duration of 7 days. But there isnt anything running as far as we can tell. Any ideas why changing the state to Finished State causes the flow runs in prefect UI to look like they are running forever? When I change our philosophy to setting the new state to Cancelled('message") we dont get see this behavior. The flow duration is exactly what it should be in that case. Just when we try to set the new state to a Finished instance.
    k
    • 2
    • 18
  • d

    Daniel Komisar

    03/23/2022, 3:44 PM
    Hello everyone, in prefect cloud is there any way to turn off the Lazarus retry? Thank you.
    k
    • 2
    • 5
  • d

    Domenico Di Gangi

    03/23/2022, 3:44 PM
    Hi all, I am experimenting with orion and I find it amazing so far. I noticed that there is no mention in the orion docs of logging the stdout, as is possible in prefect 1.0 . Also
    @task(log_stdout=True)
    gives an error in orion. Is there an alternative way to log the stdout in orion ?
    k
    m
    • 3
    • 3
  • j

    Jake

    03/23/2022, 3:44 PM
    What does it mean when there are “Late submittable runs”? Our agent was successfully registered and labelled with the right labels but the flows are not running.
    k
    • 2
    • 10
  • f

    Florian Guily

    03/23/2022, 4:33 PM
    Hello, i'm running into an issue were i have tasks that are skipped when none of their upstream tasks fail. When looking at the execution schema in the dashboard, i see that the task in charge of loading secrets like api key has been skipped because the first task that needed it has been skipped (this one is an intended behaviour). I suppose that my last task got skipped because the secret it needs as an argument has been previously skipped. Am i right ? if so how can i "force" the task to get the secret ? Here is the schema :
    k
    • 2
    • 6
  • r

    Rachel Funk

    03/23/2022, 5:20 PM
    Hello! We're running into the same
    no heartbeat detected
    error that I've seen a few other folks flag. I read through the discourse documentation and the FAQ. I don't think it's a memory issue because a much larger Flow is running from the same agent + Google Cloud VM without any issues. I can't seem to use the proposed solutions to configure heartbeats to use threads because I'm realizing that my Flows do not use:
    *from* prefect.run_configs
    So, I'm not using Universal Run or ECSRun. I'm guessing the setup of my flow might pre-date that library. Is that possible? Should I import that library if my Flows have been running fine without it? I should mention that this heartbeat issue only started to crop up a few days ago.
    k
    • 2
    • 15
  • j

    Jason Bertman

    03/23/2022, 5:26 PM
    Hey Community, Having an issue with the Apollo service in an EKS cluster. It works pretty well, until ~500-1000 mapped tasks are trying to update their task_run_state. The problem is that if even one fails, it gets stuck in a pending state and the flow just hangs. Eventually, the prefect-job pod exits and things continue to go sideways from there.
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1922, in set_task_run_state
        result = self.graphql(
    ...
        raise ConnectionError(e, request=request)
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='prefecthq-apollo.prefect', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f12486259d0>: Failed to establish a new connection: [Errno -2] Name or service not known'))
    This error would seem to indicate that the service can't even be resolved, but this DNS name is perfectly resolvable from the dask pods, and many tasks succeed before a couple fail. I see signs of distress from the apollo service in the form of:
    │ BadRequestError: request aborted                                                                                                                                                                                                           │
    │     at IncomingMessage.onAborted (/apollo/node_modules/raw-body/index.js:231:10)                                                                                                                                                           │
    │     at IncomingMessage.emit (events.js:315:20)                                                                                                                                                                                             │
    │     at abortIncoming (_http_server.js:561:9)                                                                                                                                                                                               │
    │     at socketOnClose (_http_server.js:554:3)                                                                                                                                                                                               │
    │     at Socket.emit (events.js:327:22)                                                                                                                                                                                                      │
    │     at TCP.<anonymous> (net.js:673:12)
    repeated pretty much as many times as the dask pod tries it. Light research on this case points to the service being overwhelmed. Is this case covered by the request retrier here? https://github.com/PrefectHQ/prefect/blob/a2041c7ff1a619e611f614ed3394fccd05bb2005/src/prefect/client/client.py#L633 If not, what's the best way to handle this case? Any configuration changes I could be making the Apollo pod?
    k
    • 2
    • 31
  • k

    Ken Nguyen

    03/23/2022, 6:08 PM
    Hi, I’m currently running into issues trying to do a simple get request for logs from the GraphQL API. Could anyone help me identify where my mistake is to get the error below? I’ve tested the query in the Prefect front end’s Interactive API and it works just fine. I’ve also tested in a Javascript get request that that API key works fine, so this may be a requests library related mistake?
    k
    • 2
    • 15
  • a

    Anatoly Myachev

    03/23/2022, 6:21 PM
    Hello all! When executing
    prefect orion kubernetes-manifest | kubectl apply -f -
    command,
    kubernetes
    queue is not created, although according to the documentation it should. Has anyone had similar?
    k
    m
    • 3
    • 4
  • h

    Hedgar

    03/23/2022, 6:37 PM
    Hey guys any working example of how to upload csv files from local machine to S3 using awswrangler? Of course prefect task
    a
    • 2
    • 10
  • r

    Rajan Subramanian

    03/23/2022, 6:45 PM
    hi guys, when i hit run on a prefect ui on the cloud, its creating a separate run rather than just restarting the run i had just started. • is there a way to run it just once? • Another thing is, if i run a deployment file with many deployments, like 30, its kind of painful to manually go to the UI and hit run on all of them. there must be some ability in the deployment spec to ensure a file thats not on a schedule gets run immediately upon deployment. • i want the ability to constantly check if a task is running and if its not, to initiate a run. a signal or whatever. • ability to email my team and I in case something crashes on the prefect side that was not picked up on the linux side
    a
    k
    • 3
    • 42
  • c

    Chris Reuter

    03/23/2022, 6:55 PM
    Starting in 5 minutes! See you on Twitch! https://prefect-community.slack.com/archives/CL09KU1K7/p1648046010730119
    🔥 3
    :marvin: 3
    ❤️ 3
  • k

    kevin

    03/23/2022, 8:06 PM
    I’m like 99% certain that this is the case but I’m just looking to confirm: when I have a task that creates a piece of mutable data, say a dictionary, and I pass it downstream to two different tasks, both tasks get a reference to the original dict right? So if one task edits it there risk for conflict. ex:
    with Flow('foo') as f:
        a = task(lambda x: {'key': x})('value')
        b = task(lambda y: y.pop('key'))(a)
        c = task(lambda z: z.get('key'))(a)
    a
    k
    • 3
    • 13
  • a

    Anatoly Myachev

    03/23/2022, 8:20 PM
    Is it possible to configurate Orion to use `KubernetesFlowRunner`+
    DaskTaskRunner
    that use
    dask_cloudprovider.aws.FargateCluster
    ? There is no logs in Orion UI except
    'task-b610b3b0-0' - Crash detected! Execution was interrupted by an unexpected exception.
    and
    RuntimeError: IOLoop is closed
    but it's possible to see it in CloudWatch console. It contains the following error:
    Exception: 'RuntimeError("Cannot orchestrate task run \'9987d480-fad1-433a-9749-077a63fbdc0a\'. Failed to connect to API at <http://orion:4200/api/>.")'
    . Note: Orion is deployed on Kubernetes via
    orion kubernetes-manifest
    command.
    a
    • 2
    • 7
  • k

    Kevin Kho

    03/23/2022, 8:27 PM
    message has been deleted
    🙏 1
    📺 1
  • a

    Adam

    03/23/2022, 8:32 PM
    Hey Prefect team, I have an urgent question regarding Prefect Cloud. Since switching from the legacy payment tier and hitting the 20k free cap all our tasks are halted UsageExceeded. We have a credit card on file but nothing is going through. Please help as we've ground to a standstill.
    k
    k
    a
    • 4
    • 17
  • e

    Edward Chen

    03/23/2022, 8:46 PM
    Hey Prefect team! I was wondering if there's documentation or a github repo that I can look at for integrating Singer taps with Prefect? What I would like to do is run Singer taps in a prefect flow. I did some research and I see Singer tap integrations with Airflow and Dagster so I assume Prefect has the capabilities too but I can't seem to figure it out 😅
    k
    a
    • 3
    • 5
  • k

    Ken Nguyen

    03/23/2022, 11:21 PM
    If I have a flow like :
    with Flow("flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
      json_data = get_json_data(
        url, query, headers,
        task_args={"name": "Getting Flow Data"}
      )
    How can I then access json_data as a python object, rather than a FunctionTask object?
    k
    • 2
    • 9
  • r

    Richard Freeman

    03/24/2022, 1:25 AM
    Hi all, was trying to rename my Team but unable to so deleted it, but now not able to add a new one? Is this a bug, will this be a big issue?
    k
    a
    • 3
    • 7
  • k

    Ken Nguyen

    03/24/2022, 2:23 AM
    What is the difference between
    StartFlowRun
    vs
    create_flow_run
    ? They seem to be slightly different in the documentation but has the same end result
    k
    • 2
    • 3
  • j

    Jeff Kehler

    03/24/2022, 2:52 AM
    Hello all. I've run across an issue that I just can't seem to figure out and I feel like it's probably something really simple. I've structured my project like so.
    my_module/
      flows/
        my_flow.py
      tasks/
      lib/
         tasks/
           shared_task.py
    So in
    my_flow.py
    I am importing from
    my_module.lib.tasks.shared_task.py
    where i've created a reusable Task class. But I am unable to register this flow using the prefect command
    prefect register --project Test -m <http://my_module.flows.my|my_module.flows.my>_flow
    I've tried many different combinations of the
    prefect register
    command and it just continues to generate a
    ModuleNotFoundError: No module named my my_module
    However, if I just use
    flow.register()
    inside of
    my_flow.py
    and execute it using
    python my_module/flows/my_flow.py
    it registers just fine.
    k
    • 2
    • 6
  • m

    Muddassir Shaikh

    03/24/2022, 5:03 AM
    Logs not visible on UI , how to fix this ?
    :discourse: 1
    a
    k
    p
    • 4
    • 22
  • a

    Abuzar Shaikh

    03/24/2022, 6:48 AM
    Hey guys, Is there a way we can access the Postgres DB for the Prefect and query data from it. I am unable to find access the database. Any commands, default paths, or documentation would be helpful. Thanks
    a
    • 2
    • 1
  • i

    Ievgenii Martynenko

    03/24/2022, 9:20 AM
    What is the proper way to stop local Prefect? I run 'prefect server start' as backend server and then press Ctrl+C to stop it, next time you start it Prefect throws a lot of duplicate violation errors and doesn't start properly (you can't create a project for example)
    a
    • 2
    • 4
  • i

    Ievgenii Martynenko

    03/24/2022, 9:32 AM
    Also there 2 more strategic questions: 1. How doesn't Prefect understand that flow fails? I have a Flow with a single Task. Task calls some external module and that module throws an exception, but Task and Flow in Prefect completes successfully?
    a
    • 2
    • 7
  • i

    Ievgenii Martynenko

    03/24/2022, 9:44 AM
    2) How to handle multithreading inside custom modules? Task runs from Prefect, but code defined in custom module. If there is a multithreading block like below, Prefect will not wait until all futures/threads are completed, but succeed task immediately and exit.
    # with concurrent.futures.thread.ThreadPoolExecutor(max_workers=10,
    #                                                   thread_name_prefix="flow_") as executor:
    #     futures = [executor.submit(self._run, df) for df in self.dfs]
    #
    #     for future in as_completed(futures):
    #         try:
    #             future.result()
    #         except Exception as exc:
    #             logger.exception(exc)
    a
    k
    • 3
    • 6
Powered by Linen
Title
i

Ievgenii Martynenko

03/24/2022, 9:44 AM
2) How to handle multithreading inside custom modules? Task runs from Prefect, but code defined in custom module. If there is a multithreading block like below, Prefect will not wait until all futures/threads are completed, but succeed task immediately and exit.
# with concurrent.futures.thread.ThreadPoolExecutor(max_workers=10,
#                                                   thread_name_prefix="flow_") as executor:
#     futures = [executor.submit(self._run, df) for df in self.dfs]
#
#     for future in as_completed(futures):
#         try:
#             future.result()
#         except Exception as exc:
#             logger.exception(exc)
a

Anna Geller

03/24/2022, 1:34 PM
Normally with Prefect, you shouldn't have to use multithreading since when you use mapping, Prefect does that for you automatically
but if you want to use multithreading yourself you would need to make sure that Prefect context stays thread-safe - this is a bit tricky, there is some functionality built into Prefect to make that easier but this would involve some work on your end In contrast, if you leverage mapping and e.g.
LocalDaskExecutor
that uses multithreading under the hood by default, then Prefect will make sure that the context stays thread safe
btw thank you so much for posting separate questions in separate threads, you are a role model! 🙏
i

Ievgenii Martynenko

03/24/2022, 2:23 PM
Yeap, for DaskCluster its clear. It controls multithreading itself and that all things work fine. I was asking in the context of our library has built in multithreading features that allows you to submit tasks from any command line, but when you take that library as is and run in Prefect - that won't work. So basically we need to wrap each library function into Prefect Task and then pass them through Prefect mapping feature
k

Kevin Kho

03/24/2022, 2:46 PM
Coming from other thread, if you pass the
logger
as an input into your multiprocessing function, it will work, but really just use LocalDask and then you get observability into your tasks as well
a

Anna Geller

03/24/2022, 3:34 PM
Yeap, for DaskCluster its clear. It controls multithreading itself and that all things work fine.
Actually, using
LocalDaskExecutor
doesn't use Dask, it uses multithreading. Check out https://discourse.prefect.io/t/what-is-the-difference-between-a-daskexecutor-and-a-localdaskexecutor/374
View count: 6