https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Jacob Goldberg

    08/02/2022, 5:44 PM
    Hi everyone. I started recieving an error message for all flows:
    404 Client Error for <http+docker://localhost/v1.41/containers/create?name=elastic-salamander>: Not Found ("No such image: <http://26123096XXXX.dkr.ecr.us-east-1.amazonaws.com/cal_val_etl_flows:latest-165946XXXX%22|26123096XXXX.dkr.ecr.us-east-1.amazonaws.com/cal_val_etl_flows:latest-165946XXXX">)
    I can go on AWS ECR, see the container, and see that the URI matches with the one from the error so it does exist…I have changed some settings recently regarding the python version and docker build, but cannot understand why i would get a container not found error edit: formatting
    y
    • 2
    • 2
  • h

    haris khan

    08/02/2022, 5:51 PM
    while running the deployemnt from ui in prefect 2.0.1 . athough the directory exists
    a
    a
    • 3
    • 5
  • c

    Chu

    08/02/2022, 6:36 PM
    Which syntax I can use to get current flow run's parameters value
    ✅ 1
    a
    • 2
    • 8
  • m

    Mansour Zayer

    08/02/2022, 8:50 PM
    Hello. In Prefect 1: Context: we have a flow of flows, and to manage dependencies, we have 2 solution. Either make sure the infra registers the flows in order, or to import and register the underlying flows in the flow-of-flows. What could go wrong if you do register a flow in the flow-of-flows like
    from underlying_flows.flow_a import flow_a
    
    flow_a.register(project...)
    
    With Flow(...):
        ...
    Because when we do that, prefect throws a warning
    UserWarning: Attempting to call "flow.register" during execution of flow file will lead to unexpected results.
    👀 1
    b
    • 2
    • 3
  • j

    John Mizerany

    08/02/2022, 9:19 PM
    For launching an ECS instance via the
    --cli-input-json
    tag for cli, should there be an extra argument in the
    prefect-agent-td.json
    file called
    agentRoleArn
    ? There are two for
    executionRoleArn
    and
    taskRoleArn
    but if the agent itself needs a Role in AWS should that be specified or can it be included in the other variables?
    m
    • 2
    • 8
  • b

    Billy McMonagle

    08/02/2022, 9:25 PM
    Hello! I have a 2.0 question about registering and using blocks, specifically infrastructure blocks.
    ✅ 1
    a
    • 2
    • 9
  • h

    Hafsa Junaid

    08/02/2022, 9:48 PM
    Following these links for running prefect sample flow on cloud 2.0 : https://orion-docs.prefect.io/ui/cloud-getting-started/ + https://orion-docs.prefect.io/concepts/storage/ Executed this command:
    prefect deployment build ./sample.py:samplefunc --name "ALS Deployment" --storage-block recommenderprefectserver
    Error: Deployment.yaml is not created with following exception
    ✅ 1
    t
    a
    • 3
    • 5
  • t

    Tom Kaszemacher

    08/02/2022, 10:02 PM
    Hi all! I could use some help here. I’m trying to query a DB to get a collection of inputs to process, and to dispatch each input to a sub-flow:
    @task()
    def get_inputs() -> List:
        # This task retrieves a list of inputs to process from a DB and returns it
        pass
    
    
    with Flow(name='dispatcher') as dispatcher:
    
        inputs = get_inputs()
    
        for i in inputs:
            create_flow_run('worker', parameters={'input': i})
    It doesn’t work as I need to provide either nouts or return a Tuple in order to iterate over my inputs, however I don’t know the size of it. What would be a correct approach here? Thanks!
    ✅ 1
    m
    n
    • 3
    • 5
  • a

    André Bonatto

    08/02/2022, 10:11 PM
    Hi! I'm trying to specify result location per task in Orion but I can't find any documentation on the issue. Was the task result functionality removed in Orion?
    ✅ 1
    a
    • 2
    • 4
  • h

    Hafsa Junaid

    08/02/2022, 10:54 PM
    Prefect Agent and and Orion both are up, yet the flow on cloud: 2.0 is scheduled only and paused in the late state
    ✅ 1
    j
    j
    m
    • 4
    • 18
  • c

    CA Lee

    08/03/2022, 2:14 AM
    Hello, is there a way to render markdown written in deployment.yaml:description in the Prefect Orion Cloud UI?
    ✅ 1
    m
    • 2
    • 3
  • h

    Hafsa Junaid

    08/03/2022, 2:45 AM
    Do we have docker-compose for prefect 2.0 ?
    👀 1
    🙌 1
    ✅ 1
    s
    j
    • 3
    • 2
  • h

    Hafsa Junaid

    08/03/2022, 2:48 AM
    How can we logout of prefect cloud:2.0 and switch back to prefect UI? I am running Orion start command but queue is created on cloud
    ✅ 1
    r
    o
    +2
    • 5
    • 6
  • w

    wonsun

    08/03/2022, 3:18 AM
    Hi all~ I'm using Prefect1.0. Is there any method that trigger to immeadiatly excute the flow, if the certain task is finished in the app we developed? NOT trigger from the task.
    ✅ 1
    b
    • 2
    • 1
  • r

    Rajvir Jhawar

    08/03/2022, 7:07 AM
    Version:             2.0.2
    API version:         0.8.0
    Python version:      3.9.13
    I having problems with both the
    read_flow_by_name
    and
    read_deployment_by_name
    . Neither of these method is working in when i try to use them: Sample code:
    from prefect.client import get_client
    import asyncio
    import json
    async def main():
    async with get_client() as client:
    data = await client.read_flow_by_name(flow_name='test_flow')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    Error:
    Traceback (most recent call last):
    File
    data = await client.read_flow_by_name(flow_name='test_flow')
    /.venv/lib/python3.9/site-packages/prefect/client.py", line 508, in read_flow_by_name
    return schemas.core.Deployment.parse_obj(response.json())
    File "pydantic/main.py", line 521, in pydantic.main.BaseModel.parse_obj
    File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
    pydantic.error_wrappers.ValidationError: 2 validation errors for Deployment
    flow_id
    field required (type=value_error.missing)
    manifest_path
    field required (type=value_error.missing)
    ✅ 1
    r
    • 2
    • 6
  • v

    Viet Nguyen

    08/03/2022, 7:12 AM
    Hi all, I'm having a look at prefect-email, and wonder if there is a better way to securely store the password rather than having it hard coded into the Python scripts? https://github.com/PrefectHQ/prefect-email Thank you 🙂
    ✅ 1
    m
    • 2
    • 13
  • o

    Oscar Björhn

    08/03/2022, 8:15 AM
    Is anyone else on 2.0.2 yet? I'm getting an interesting error when I try to deploy using a remote storage block (seems to work with a local file system). Code: prefect deployment build orchestration/flows/test_cleansed.py:default -n test_cleansed -t dev --storage-block azure/azure-dev --infra docker-container -o test_cleansed-deployment.yaml Error: TypeError: put_directory() got an unexpected keyword argument 'ignore_file' on line 470 of cli/deployment.py Judging by the source code in Github, it seems like the overload of put_directory() for the Azure storage block specifically omits the ignore_file argument. Perhaps these two features were not tested together, unless I'm missing something. I'll give people an hour to say that I'm doing something wrong, otherwise I'll create an issue (or a PR, might be an easy fix).
    ✅ 1
    a
    m
    • 3
    • 16
  • t

    T B

    08/03/2022, 10:04 AM
    Hi all, (working on prefect 2.0.2) I am running a deployment flow on scheduled interval. I have 2 agents listening on the same work-queue and grabbing this flow. One agent gets an error : prefect.exceptions.Abort: This run has already terminated. ---> The flow is considered "Crashed". Is it the expected behavior ? How is it possible to split the load on multiple agents for Scheduled deployments ? Cheers
  • a

    Alvaro Durán Tovar

    08/03/2022, 10:14 AM
    I'm trying to find how to use kubernetes tasks on prefect 2, is that possible? (I mean same as in prefect 1)
    ✅ 1
    a
    a
    • 3
    • 5
  • h

    haris khan

    08/03/2022, 10:17 AM
    getting server timeout when running the deployment from cloud ui 10:09:41.192 | DEBUG | prefect.agent - Checking for flow runs... 10:09:46.444 | DEBUG | prefect.agent - Checking for flow runs... 10:09:51.687 | DEBUG | prefect.agent - Checking for flow runs... 10:09:56.936 | DEBUG | prefect.agent - Checking for flow runs... 10🔟02.188 | DEBUG | prefect.agent - Checking for flow runs... 10🔟07.439 | DEBUG | prefect.agent - Checking for flow runs... 10🔟12.683 | DEBUG | prefect.agent - Checking for flow runs... 10🔟18.003 | DEBUG | prefect.agent - Checking for flow runs... 10🔟23.248 | DEBUG | prefect.agent - Checking for flow runs... 10🔟28.530 | DEBUG | prefect.agent - Checking for flow runs... 10🔟28.709 | INFO | prefect.agent - Submitting flow run '62df0689-3a3d-4bb0-bcaa-a16fb215df67' 10🔟29.536 | ERROR | prefect.agent - Flow runner failed to submit flow run '62df0689-3a3d-4bb0-bcaa-a16fb215df67' Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/prefect/agent.py", line 169, in submit_run await self.task_group.start(submit_flow_run, flow_run, infrastructure) File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 701, in start return await future File "/usr/local/lib/python3.8/site-packages/prefect/infrastructure/submission.py", line 48, in submit_flow_run return await infrastructure.run(task_status=task_status) File "/usr/local/lib/python3.8/site-packages/prefect/infrastructure/kubernetes.py", line 201, in run job_name = await run_sync_in_worker_thread(self._create_job, manifest) File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread return await anyio.to_thread.run_sync(call, cancellable=True) File "/usr/local/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable, File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread return await future File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run result = context.run(func, *args) File "/usr/local/lib/python3.8/site-packages/prefect/infrastructure/kubernetes.py", line 408, in _create_job job = batch_client.create_namespaced_job(self.namespace, job_manifest) File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job return self.create_namespaced_job_with_http_info(namespace, body, **kwargs) # noqa: E501 File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 309, in create_namespaced_job_with_http_info return self.api_client.call_api( File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api return self.__call_api(resource_path, method, File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api response_data = self.request( File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 391, in request return self.rest_client.POST(url, File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 275, in POST return self.request("POST", url, File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 234, in request raise ApiException(http_resp=r) kubernetes.client.exceptions.ApiException: (500) Reason: Internal Server Error HTTP response headers: HTTPHeaderDict({'Audit-Id': 'c94033e6-af19-414c-8c50-72811876a885', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '1133f237-04af-4269-a466-2f5eb69e6670', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'e6f943c5-ae55-45d1-ab0e-cb0161b8bd0e', 'Date': 'Wed, 03 Aug 2022 10🔟29 GMT', 'Content-Length': '264'}) HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"The POST operation against Job.batch could not be completed at this time, please try again.","reason":"ServerTimeout","details":{"name":"POST","group":"batch","kind":"Job"},"code":500}
    m
    a
    • 3
    • 3
  • s

    Shrikkanth

    08/03/2022, 11:34 AM
    Hi people, Like ShellTask function in Prefect v1 (https://docs-v1.prefect.io/api/latest/tasks/shell.html#shelltask) , Is there anyway to call upon a bash script in Prefect 2.0 using any other function as I cant find any documentation for it. Any Suggestion?
    ✅ 1
    a
    • 2
    • 2
  • a

    Amol Shirke

    08/03/2022, 12:43 PM
    Hi, How I make SSL connection while connecting to postgres db?
    ✅ 1
    a
    • 2
    • 5
  • a

    Amol Shirke

    08/03/2022, 12:44 PM
    I know asyncpg has way to pass sslmode, ssl sertificate
    ✅ 1
  • p

    Paul Lucas

    08/03/2022, 12:45 PM
    Hi I’ve managed to get a POC for Prefect up and running in Heroku but issue I have now is that when I go to run the flow that run dbt, it doesn’t appear to be picking up the env var for the dbt target and is instead falling back on the dev one. I’m currently creating a task like so
    dbt_shell_task = DbtShellTask(
        profiles_dir=str(DBT_PROFILES_DIR),
        environment=str(DBT_TARGET),
        overwrite_profiles=False,
        helper_script=f"cd {DBT_ROOT}",
        return_all=True,
        stream_output=True,
        state_handlers=[post_to_slack],
    )
    If I run dbt outside Prefect, i.e.
    dbt build --target=$DBT_TARGET
    it works as expected. Am I using the wrong variable for
    target
    ?
    a
    • 2
    • 5
  • s

    Scarlett King

    08/03/2022, 1:18 PM
    Hi guys, has anyone done testing for mapped flow? I'm trying to write a test to verify if a task is available in the flow but I can't seem to access the tasks that are inside the mapped flow
    t
    a
    • 3
    • 13
  • b

    Benjamin.bgx

    08/03/2022, 1:44 PM
    Hi everyone, With prefect 2.0 : Is it possible to use prefect.inrastructure to run a docker on Azure outside the agent ? My VM where I install the agent have 1Go of RAM, but I want to set up a docker task (like FARGATE on AWS) with more ressources to execute the flow. Or should I go straight to kubernetes ? Someone have tried Kubernetes on Azure for this ?
    c
    • 2
    • 5
  • m

    Matt Delacour

    08/03/2022, 2:30 PM
    Hi all, How can I get more descriptive names of tasks like "create_flow_run" & "wait_for_flow" ? Otherwise in prefect cloud it's hard to understand which child flow started or not ...
    m
    n
    • 3
    • 3
  • r

    Rio McMahon

    08/03/2022, 2:41 PM
    I am running flows on prefect 1.0 with an ECS Agent. The flow is successfully scheduled/submitted then gets stuck in that state. I looked on ECS and it looks like the container starts then immediately dies. There aren’t any associated logs that I can find. Any recommendations on how to debug this? EDIT - ended up being a 1.0 agent/2.0flow compatibility issue. For brevity the ECS container stop reason was
    essential container in task exited
    with no generated logs. The entrance command
    ["/bin/sh", "-c", "prefect execute flow-run"]
    was failing because (I think) prefect 2.0 doesn’t have an
    execute
    command. I resolved this by pinning my prefect dependency to
    pip install "prefect[aws, gitlab]==1.3.0"
    .
    b
    t
    • 3
    • 5
  • r

    Rajvir Jhawar

    08/03/2022, 2:55 PM
    Prefect : 2.0.2 I added the "version" parameter to my flow, but it doesn't show up in the UI. When I go to retrieve the flow model version is not included either.
    from prefect import flow, get_run_logger
    
    @flow(name="test", version="1.0beta")
    def main():
        logger = get_run_logger()
        <http://logger.info|logger.info>(f"Hello World")
    
    if __name__ == "__main__":
        main()
    Flow model details:
    id=UUID('120dea46-79e8-4f42-99f0-7e300a2b3cb9') name='test' tags=[]
    This is big blocker for me as I want to the
    create_flow_run
    method, but it errors out after you give it a flow model without a version in it. Any suggestion on what i am doing wrong here?
    k
    • 2
    • 12
  • c

    Chu

    08/03/2022, 3:13 PM
    hi, wanna ask a question if someone may know or have some clues 😉 Appreciate for any help!🙌 In a flow run, we defined some parameters using
    MyParams = Parameter('My_Param', default="my_input")
    , in Prefect UI we can input some parameters to overwrite the default at run time, my question is: is there a way to get the run time parameter input (not the MyParams object, but the actual input)? In our case, the
    default
    is a list (where we pass a list of ids), we need to customize the flow run name by extracting each id from
    MyParams
    and formalizing a new list (child flow run name) for
    map
    function to work. From what I understand and tested,
    MyParams
    is a serialized object, to access its input first needs to deserialize it or using another way, we cannot just loop over it using
    [i+'_customzied_string' for i in MyParams]
    a
    b
    • 3
    • 13
Powered by Linen
Title
c

Chu

08/03/2022, 3:13 PM
hi, wanna ask a question if someone may know or have some clues 😉 Appreciate for any help!🙌 In a flow run, we defined some parameters using
MyParams = Parameter('My_Param', default="my_input")
, in Prefect UI we can input some parameters to overwrite the default at run time, my question is: is there a way to get the run time parameter input (not the MyParams object, but the actual input)? In our case, the
default
is a list (where we pass a list of ids), we need to customize the flow run name by extracting each id from
MyParams
and formalizing a new list (child flow run name) for
map
function to work. From what I understand and tested,
MyParams
is a serialized object, to access its input first needs to deserialize it or using another way, we cannot just loop over it using
[i+'_customzied_string' for i in MyParams]
a

Anna Geller

08/03/2022, 4:12 PM
I encourage you to check examples on Discourse
e.g. https://discourse.prefect.io/t/how-to-map-over-flows-with-various-parameter-values/365
b

Blake Enyart

08/03/2022, 8:13 PM
Hey @Anna Geller, So I've been digging through some of these threads and not having much luck and I think I'm trying to address a similar problem as @Chu here. I'm trying to get the value from a Parameter and use it in the Prefect flow in version 1.2.4. With that, I'm seeing the documentation here which indicates the syntax that you mentioned in the other thread of
prefect.context.parameters.get("<parameter name>")
. However, when I try to use that syntax (in a flow which is triggered by the Prefect UI and run via a Universal Run agent) I am getting the error that the
.parameters
attribute is not found as part of the
.context
module. I see the documentation which indicates it should be there on the context at runtime, but here is a screenshot attached to this message that shows the intellisense code completion options I have in VS Code with the parameters function missing. Is there something I and @Chu might be missing in all this where the goal is to directly access the list used in the run in the rest of the code (i.e. if I run the flow with the defaults it grabs that and uses it in the rest of the code, but if I pass in a different value at runtime to the Prefect flow it will use the updated values in the rest of the code instead of the default values)
c

Chu

08/03/2022, 8:18 PM
I think we are all trying to iterate a Parameter() after it creates, I find a great explanation of the nuances from Prefect CTO: https://stackoverflow.com/questions/64155793/is-it-possible-to-loop-over-a-prefect-parameter
a

Anna Geller

08/03/2022, 8:36 PM
I may not have emphasized that but passing data from parameter values works the same way as passing any data dependencies between tasks
b

Blake Enyart

08/03/2022, 8:50 PM
So if I wanted to pass the data from the Parameter to the python variable
tableau_online_datasources
do I do
tableau_online_datasources.result.value
?
a

Anna Geller

08/04/2022, 9:35 AM
No, you would pass it as any Python object in functional programming
c

Chu

08/04/2022, 12:08 PM
But Can we loop over a Parameter object? like
with Flow("My flow") as flow:
    urls = Parameter("urls", default=['url1', 'url2', 'url3'])
    result = [url+'_string' for url in urls]
This behavior will give an error said, Task is not iterable.. I understand urls is a serialized Object, is there a way to diseralize it to retrieve the run time value (which maybe something different than default)
a

Anna Geller

08/04/2022, 12:11 PM
no looping is allowed in 1.0 unless you use mapping I believe that you should switch to 2.0 as soon as possible, 1.0 DAG requirement seems super confusing to you and it's no longer sth to worry about in 2.0
c

Chu

08/04/2022, 12:26 PM
I understand, but just to confirm is mapping in Prefect1.0 can do the above stuff? I tried different methods in mapping, not work through. Would you mind showing me the code? We are in a slow switch, legacy System Still needs 1.0 to deliver some important data pipelines.
b

Blake Enyart

08/04/2022, 5:25 PM
@Anna Geller, so we can loop over parameter object values in 2.0? We are in a position to make the migration, but just wanted to check first if there is documentation you could direct me to on this? Just did a quick search in the 2.0 docs and nothing jumps out initially in the examples that addresses this use case.
a

Anna Geller

08/04/2022, 5:33 PM
I didn't mean that 2.0 addresses any specific use case like that. I meant that in 2.0, it's easy to build such dynamic logic that dynamically combines parameter values in string manipulations and dynamically looping over things, as you mentioned, since 2.0 is essentially just Python with extra observability
✅ 1
b

Blake Enyart

08/04/2022, 5:43 PM
Awww...gotcha. That is helpful context. Thank you 👍
🙌 1
View count: 4