https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • l

    Lee Mendelowitz

    05/25/2022, 12:47 PM
    Hi everyone - is there an easy way to get the Docker container id for a flow run? We are currently get it by checking the prefect logs for the flow run, but I’m wondering if there’s a better way. We’re using the Docker agent and we are on prefect 0.15.13.
    a
    • 2
    • 13
  • r

    Robin Weiß

    05/25/2022, 12:53 PM
    Hey everyone! I am currently evaluating the use of Prefect Cloud as the main workflow automation tool for a client. I have been trying to find details on where exactly the servers would be hosted, but information seems scarce on the website on that topic. Because of EU privacy laws, guaranteeing that our data never leaves the EU is a must-have requirement. Did I miss something on the Prefect website, or is there maybe someone here with similar issues? Thanks a lot 🙂
    a
    • 2
    • 2
  • n

    Naga Sravika Bodapati

    05/25/2022, 1:13 PM
    Is there a way to set the start time of a IntervalClock for a flow using an input parameter?
    a
    • 2
    • 15
  • f

    Florian Guily

    05/25/2022, 1:14 PM
    Hey, i finally deployed prefect on a k8's cluster on aws thanks to your advices ! I was wondering if i could use the
    local dask executor
    in this cluster or if there is an aditionnal configuration to use dask as an executor in a k8's cluster ?
    e
    • 2
    • 2
  • a

    Andrew Lawlor

    05/25/2022, 1:47 PM
    wha is the default cpu_request for a kubernetes run_config?
    a
    • 2
    • 1
  • c

    Constantino Schillebeeckx

    05/25/2022, 1:58 PM
    I have a flow with a section of mapped tasks, one of them failed due to a missing heartbeat - the final flow state is
    success
    though. why wouldn't this flow's overall state be set to
    failed
    ?
    a
    • 2
    • 14
  • a

    Anna Geller

    05/25/2022, 3:11 PM
    Hi everyone! 😛refect: Just FYI: I'm OOO for the rest of the week - if you have any urgent requests, Kevin is around. I'll respond to any open threads you have with me next week. If any of this resolves itself until then, or if you have some updates, it would be great if you could add a short message about the current state. Thanks and have a great rest of the week! 🙌
    👋 2
    👏 7
    m
    • 2
    • 1
  • x

    Xavier Babu

    05/25/2022, 4:34 PM
    Is there any reason the scheduled jobs can't run on-time if it is scheduled one time using RRule? I see it is scheduled with correct DTSTART and Count = 1. But prefect agent is not picking up and running it immediately. Prefect Orion 2.0b
    k
    • 2
    • 16
  • w

    will milner

    05/25/2022, 5:18 PM
    Is there any reason why flow runs would suddenly not be able to be scheduled? I didn't make any updates at all to my server or agent and since Thursday I'm not able to run any flows. I'm using a kubernetes agent. I have no idea why this started happening or how to go about fixing this
    k
    • 2
    • 7
  • j

    John Kang

    05/25/2022, 6:04 PM
    Hi all, I'm thinking of using a shelltask and activating a specific virtual environment using the
    helper_script
    argument. Does anyone have any idea how I would write that script up?
    k
    • 2
    • 5
  • k

    Kyle Pierce

    05/25/2022, 6:53 PM
    Hey I have a nested map (three) It is using the output of the last result, instead of using the list of outputs. Do i need to do something to make the output of the 1st map iterable?
    k
    • 2
    • 16
  • f

    Florian Kühnlenz

    05/25/2022, 7:06 PM
    Are there problems with accessing prefect secrets right now?
    k
    • 2
    • 5
  • j

    John O'Farrell

    05/25/2022, 7:31 PM
    Hello, I'm currently trying to register a flow to amazon ECR using the docker storage. I want to include an existing dockerfile to handle the different modules the flow needs, but when I add the dockerfile parameter when setting the flow's storage it causes the process to get stuck on
    prefect.Docker:Building the flow's Docker storage...
    after creating a new temp directory with the dockerfile and
    healthcheck.py
    k
    • 2
    • 41
  • j

    Jared Noynaert

    05/25/2022, 7:34 PM
    @Kevin Kho, did you ever find out more about https://github.com/PrefectHQ/prefect/issues/5769? This is impacting us on AKS also edit: the reason I'm asking is that we are probably going to have to change our default executor for all flows, just to enable task observability....!
    k
    m
    m
    • 4
    • 23
  • n

    Nikhil Jain

    05/26/2022, 12:08 AM
    @Kevin Kho I am unable to access SSM parameters and ENV variables in my flow task. I am using
    ECSRun
    as follows:
    ecs_run = ECSRun(
        run_task_kwargs={'cluster': 'dev-prefect-cluster'},
        task_role_arn='arn:aws:iam::<>:role/dev-task-runner-ecs-task-role',
        execution_role_arn='arn:aws:iam::<>:role/dev-task-runner-ecs-task-execution-role',
        image='<>.<http://dkr.ecr.us-west-1.amazonaws.com/dev-automation-scripts-ecr:latest|dkr.ecr.us-west-1.amazonaws.com/dev-automation-scripts-ecr:latest>',
        labels=['ecs', 'dev']
    )
    flow.run_config = ecs_run
    ...
    And then in my terraform config for the
    prefect-agent
    , I am setting a
    task_definition
    . And this
    task_definition_file
    is stored in s3 and contains a list of
    env
    parameters and
    secrets
    that are pulled from SSM parameter store. What’s happening is that when the flow-task is created, it is not getting any of the
    env
    and
    secrets
    that I supply in the task_definition_file. I can kinda see why this is happening: I am not supplying any task_definition in the
    ECSRun
    config during flow registration. The reason is 2-fold: I don’t want to hard-code these env/SSM parameters in python code, they are supposed to live in terraform configs. And also, we want to share these env and ssm parameters across many tasks, so don’t want to have to define them everytime. Is there a way to accomplish this?
    k
    • 2
    • 14
  • a

    Alex de Geofroy

    05/26/2022, 1:13 AM
    Hey everyone. Not sure the right person to address this to, but I'm having issues with creating Prefect Agents in Azure. The error I'm receiving is "This offer is not available for subscriptions from Microsoft Azure Cloud Solution Providers." Is this something the Prefect team can enable, or is there other guidance on how to configure a subscription that will enable me to use the Agent? Here's an article I found describing the process for enabling the CSP program: https://docs.microsoft.com/en-us/azure/marketplace/cloud-solution-providers?WT.mc_id=AZ-MVP-5000120
    k
    • 2
    • 11
  • t

    Thomas Henn

    05/26/2022, 5:25 AM
    Hi everyone ! What is the proper way to use a debugger with Prefect Orion? For example, is there any debug setting so that Prefect Orion doesn't capture the exceptions within tasks? I couldn't find anything about this in the documentation. Thanks in advance!
    k
    m
    • 3
    • 6
  • d

    Dharit Sura

    05/26/2022, 5:55 AM
    Hi everyone, I am having an issue in running Prefect 2.0 flows. Can anyone take a look at the error and help me understand what is the issue that is happening. I have tried to uninstall and reinstall prefect as well as python. But still the issue is continuing.
    ✅ 1
    k
    • 2
    • 8
  • d

    Dharit Sura

    05/26/2022, 5:56 AM
    ✅ 1
  • s

    Sumant Agnihotri

    05/26/2022, 6:16 AM
    Hi all, I have set up my Prefect Server, however, the
    API Tokens
    option is disabled. How do I connect to my server, to register Flows, etc?
    k
    • 2
    • 1
  • s

    Samuel Hinton

    05/26/2022, 6:52 AM
    Hi team! I currently have the slack integration turned on so I get flow failures posted. However, is there a way to actually configure what is reported using this web hook? Right now, every failure just gives us the same generic “Some reference tasks failed.” error message, when ideally it would be great to be able to pass on some of the logs from any task that failed. Is this possible to do?
    k
    • 2
    • 2
  • k

    Kayvan Shah

    05/26/2022, 7:36 AM
    What does this error trace mean?
    Traceback (most recent call last):
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/engine.py", line 468, in orchestrate_flow_run
        result = await run_sync_in_worker_thread(flow_call)
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
        return await anyio.to_thread.run_sync(call, cancellable=True)
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
        return await get_asynclib().run_sync_in_worker_thread(
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
        return await future
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
        result = context.run(func, *args)
      File "/home/kayvan/projects/prefect-demo/flows/spire_current_weather.py", line 55, in spire_current_weather
        sample = random.choice(conf)
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/random.py", line 346, in choice
        return seq[self._randbelow(len(seq))]
    TypeError: object of type 'PrefectFuture' has no len()
    using random.choice() to select randomly from a list
    k
    • 2
    • 3
  • v

    Vadym Dytyniak

    05/26/2022, 8:47 AM
    Hi. Is it possible to access current flow object in custom task implementation?
    k
    • 2
    • 21
  • a

    Abokor Ahmed

    05/26/2022, 8:50 AM
    Hello team. I'm fairly new, forgive me for my stupidity. But I have this error: I want to run this command : " prefect create project tester " I have configure my config.toml but noting !
    k
    • 2
    • 1
  • s

    Shrikkanth

    05/26/2022, 9:04 AM
    Hi guys, I'm fairly new to Prefect and I'm trying to send reports of the state of the flow to slack. The below is my code and when I run it from Prefect cloud, I get the message in slack about the status but not able to get the error message in the message section. I need to know the function or where to call for editing the message section. The
    <http://logger.info|logger.info>(lines)
    contains the error message. Any suggestions will be helpful, thanks.
    k
    • 2
    • 2
  • s

    Sanjay Patel

    05/26/2022, 10:32 AM
    Hi, can I please get some assistance. I'm successfully running a flow in my dask cluster - the flow is launched from outside the dask cluster using the following
    output = flow.run(executor=DaskExecutor(address=XXX, client_kwargs=XXX))
    however the flow contains a task that actually has a prefect flow embedded in it. An unknown number of parallel tasks that the primary task prepares (lets say 15 of them) and then tries to execute. I would like the 15 to be done in parallel managed by dask. I don't want to create a new dask cluster from within the primary task (not even sure if that would work) but instead want to execute the flow in the same cluster. The following line works but i'm not sure it's going to utilize all the available workers and may only execute on the worker that initiates the flow. Any guidance on how I should be running the flow2 which is prepared and called the by the primary task in 'flow' above?
    output2 = flow2.run()
    I believe i'll need to use something like this if i was using pure dask - https://distributed.dask.org/en/stable/task-launch.html but i'm not sure the prefect method which will achieve this and would like to keep the additional features that prefect offers. Thanks so much in advance!
    k
    • 2
    • 2
  • s

    Stéphan Taljaard

    05/26/2022, 10:45 AM
    Hi. How do you save (and read) dollar signs in a secret? I'm on Prefect Server, implemented in GCP. The secret comes from Google Secret Manager. I have a custom secret function that, in development, pulls the secret from my PC's environment variables. I develop on Windows. I now have a DB with a password containing dollar signs. Because of this line, I can't read the password out correctly. Thoughts? Example:
    import os
    from prefect.client import Secret
    
    secret_name = 'JSON_SECRET'
    # os.environ[
    #     f"PREFECT__CONTEXT__SECRETS__{secret_name}"
    # ] = '{"db1": {"password": "password", "user": "userA"}, "db2": {"password": "P@$$w0rd", "user": "userB"}}'
    
    secret_value = Secret(secret_name).get()
    print(secret_value['db2']['password'])
    # This prints 'P@$w0rd', not 'P@$$w0rd'
    k
    • 2
    • 4
  • k

    Kayvan Shah

    05/26/2022, 11:06 AM
    2022-05-26 11:04:12 | [INFO]:numexpr.utils - NumExpr defaulting to 2 threads.
    Traceback (most recent call last):
      File "/home/kayvan/projects/prefect-demo/flows/spire_current_weather.py", line 65, in <module>
        spire_current_weather()
      File "/home/kayvan/projects/prefect-demo/flows/spire_current_weather.py", line 62, in spire_current_weather
        spire_pipe(obj, sample)
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/flows.py", line 319, in __call__
        return enter_flow_run_engine_from_flow_call(self, parameters)
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/engine.py", line 110, in enter_flow_run_engine_from_flow_call
        return anyio.run(begin_run)
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
        return asynclib.run(func, *args, **backend_options)
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
        return native_run(wrapper(), debug=debug)
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/asyncio/runners.py", line 44, in run
        return loop.run_until_complete(main)
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
        return future.result()
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
        return await func(*args)
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/client.py", line 95, in with_injected_client
        return await fn(*args, **kwargs)
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/engine.py", line 158, in create_then_begin_flow_run
        flow_run = await client.create_flow_run(
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/client.py", line 538, in create_flow_run
        response = await <http://self._client.post|self._client.post>("/flow_runs/", json=flow_run_create_json)
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/httpx/_client.py", line 1820, in post
        return await self.request(
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/httpx/_client.py", line 1506, in request
        return await self.send(request, auth=auth, follow_redirects=follow_redirects)
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/prefect/client.py", line 233, in send
        response.raise_for_status()
      File "/home/kayvan/anaconda3/envs/prefect-conda/lib/python3.9/site-packages/httpx/_models.py", line 1510, in raise_for_status
        raise HTTPStatusError(message, request=request, response=self)
    httpx.HTTPStatusError: Server error '500 Internal Server Error' for url '<https://api-beta.prefect.io/api/accounts/2693bb0e-7537-4327-83f2-06354b5d2b4d/workspaces/d8a21cf4-12f7-45f8-9323-5fe529e45148/flow_runs/>'
    For more information check: <https://httpstatuses.com/500>
    While running flow in flow
    k
    • 2
    • 13
  • s

    SULEMAN KHAN

    05/26/2022, 12:10 PM
    Hi, I am facing this problem. Issue:
    Scheduled Flow
    fails because task is unable to return value when
    Prefect Server
    runs the flow with
    CloudFlowRunner
    . Same Flow runs successfully when I use
    flow.run()
    . I have attached the code, Prefect UI screenshot, and terminal output of flow.run(). if-else block at lineno 920 in
    task_runner.py
    is causing issue. If I return a value from the
    function/task
    it runs the if-block otherwise else block. line
    self.result.write(value, **formatting_kwargs)
    is calling the function with format
    self.
    and dictionary
    formatting_kwargs
    also contains the
    self
    variable, this self variable contains the object of
    Demo
    class.
    k
    • 2
    • 4
  • r

    Rei Mendel

    05/26/2022, 12:29 PM
    Hey, how can I map flow of flows with dynamic parameters? something like this: (Is it possible to get the parameters from a task?)
    parameters = ['~/folder1', '~/folder2', '~/folder3']
        create_flow_run.map(flow_name="List dir", parameters={"folder_path": ???})
    k
    • 2
    • 1
Powered by Linen
Title
r

Rei Mendel

05/26/2022, 12:29 PM
Hey, how can I map flow of flows with dynamic parameters? something like this: (Is it possible to get the parameters from a task?)
parameters = ['~/folder1', '~/folder2', '~/folder3']
    create_flow_run.map(flow_name="List dir", parameters={"folder_path": ???})
k

Kevin Kho

05/26/2022, 2:40 PM
Yes you can get the parameters from the task. In this case, parameters has to be a List of dictionaries. So you need to make it with the structure
[{"folder_path": '~/folder1'}, {"folder_path": '~/folder2'}]
View count: 6