https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • b

    Barbara Abi Khoriati

    11/04/2021, 2:24 PM
    Hi! I've mapped a task of mine, and some of them had to retry. But, after they successfully finished (the 'End Time' has been shown in the Mapped Runs tab of the task). some of them didn't change their state and are still running. Is it a bug or should I just be patient? Thanks in advance! ps.: this only happened to some of the ones that where retried.
    k
    a
    • 3
    • 9
  • m

    Martim Lobao

    11/04/2021, 4:24 PM
    i’ve got a task that depends on two other prior tasks, how can i get the child task to only start after both parents have finished (i wanted to combine the output of both tasks into a single arg instead of adding a new arg)? i tried using a simple boolean
    and
    (the output of each task is a bool), but it looks like prefect started the child task when the first parent finished instead of waiting for both:
    FIRST_TASK_DONE = first_task()
    SECOND_TASK_DONE = second_task()
    
    THIRD_TASK_READY = FIRST_TASK_DONE and SECOND_TASK_DONE
    third_task(third_task_ready=THIRD_TASK_READY)
    third_task
    was supposed to only start after both
    first_task
    and
    second_task
    had finished, but instead it only waited for the first one to complete
    k
    • 2
    • 10
  • b

    Bret Haueter

    11/04/2021, 5:02 PM
    Ran into a strange issue while clearing my Account data. I had set up a personal Team to prove out some use cases for Prefect and then found out my manager had set up a Team already for this purpose. While joined to both Teams, I went to the Account tab on my personal Team, cleared all data, and removed myself and all users from that Team. Everything seemed to work swimmingly, until I realized that all content I had personally created in either Team was deleted. This included a test Flow and a service API key I created for AWS ECS. Furthermore, I was now the only member of the Team my manager created, seeming to mean that purging the Account data actually did so for both Teams. Is this expected behavior or did I hit an odd bug?
    k
    n
    • 3
    • 2
  • e

    E Li

    11/04/2021, 5:11 PM
    Hi, I have os.path.join(path, folder) in my task, and I can run flow with it from python script but not on prefect UI. The error message is: Unexpected error: TypeError('expected str, bytes or os.PathLike object, not NoneType',). Anyone has any thoughts? Thanks in advance.
    k
    a
    • 3
    • 70
  • p

    Philip MacMenamin

    11/04/2021, 5:31 PM
    prefect agent docker start
    results in
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff5ba1a23a0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    k
    a
    • 3
    • 24
  • c

    Chris Leber

    11/04/2021, 7:13 PM
    Hi All. Having some trouble supplying a Prefect Paramater value to a BatchSubmit task. See error trace in thread.
    batch = BatchSubmit(
        job_name="test",
        job_definition="run_test",
        job_queue="test-queue",
        boto_kwargs={},
    )
    
    with Flow(
        "test Batch submit",
        storage=S3(bucket="prefect-storage"),
        run_config=ECSRun(task_definition=task_definition),
    ) as flow:
        s3_path = Parameter(
            "s3_path", default="<s3://data/TEST>"
        )
        file_suffix = Parameter("file_suffix", default=".txt")
        array_size = Parameter("array_size", default=10)
    
        batch.run(
            batch_kwargs={
                "arrayProperties": {"size": array_size},
                "parameters": {"query_s3_path": s3_path, "file_suffix": file_suffix,},
            }
        )
    j
    • 2
    • 3
  • a

    Arun Giridharan

    11/04/2021, 8:13 PM
    If I'm running the same task twice in the same flow, is there a way to use
    upstream_tasks
    to differentiate them?
    k
    • 2
    • 2
  • f

    Fina Silva-Santisteban

    11/04/2021, 8:28 PM
    @Kevin Kho @Anna Geller I’ve created a new api key for one of our service accounts using the
    Create API Key
    option in prefect cloud’s
    Service Account
    page. When I use that for flow registration I get this warning:
    /opt/hostedtoolcache/Python/3.9.7/x64/lib/python3.9/site-packages/prefect/client/client.py:175: UserWarning: Client was created with an API token configured for authentication. API tokens are deprecated, please use API keys instead.
    I set the auth token/key in a python file like this:
    client = prefect.Client(
            api_token='secret value'
        )
    client.save_api_token()
    The docs only seem to show how to set
    auth_api_key
    when using the prefect cli or a graphql query. Does this mean using the api you’re still using the deprecated token? (I’ve peeked into prefect’s implementation for the
    Client
    class and everything still says
    api_token
    , but that might be a red herring!)
    k
    • 2
    • 3
  • d

    Derek Heyman

    11/05/2021, 5:07 AM
    I'm having issues using the python client to trigger a flow. I provided the api_key and the tenant_id to the client and also tried without these inputs as I figured they would be picked up from my environment. Any ideas?
    prefect.exceptions.AuthorizationError: [{'path': ['create_flow_run'], 'message': 'AuthenticationError: Forbidden', 'extensions': {'code': 'UNAUTHENTICATED'}}]
    k
    • 2
    • 37
  • i

    Igor Adamski

    11/05/2021, 10:11 AM
    Hi! I’m coming across a very weird behaviour, I’m running a flow where I first map a task and then collect all the mapped runs and bind them into the next task, something like this:
    data_downloader = DataDownloader(b_config=db_config)
    downloaded_data = data_downloader.map(tickers_and_dates=grouped_list,
                                              columns=unmapped(columns)
                                              )
    
    save_data = SaveData(db_config=db_config,
                             override_exists_files=True)
    save_data.bind(data=downloaded_data, 
                  table_name='tmp_events')
    I’m getting very weird behaviour, where some children of the mapped task start running again after completion. This leads to this weird behaviour that the
    SaveData
    task starts running before all the mapped children finish from the
    DataDownloader
    task. See screenshots attached in the thread, it would be great if someone could offer any guidance on this. On the second screenshot, the top bar is the
    DataDownloader
    bar and you can see that the
    SaveData
    bar (just below) starts before the
    DataDownloader
    one is finished
    a
    • 2
    • 19
  • k

    Klemen Strojan

    11/05/2021, 11:25 AM
    Hey all - Can I mark a certain flow as skipped if it takes longer than some predetermined time? For example, a flow takes 3 min on average, but sometimes it hangs in a running state for a long time before it fails. If a flow takes more than 5 min I want to mark it as skipped. Cheers
    a
    • 2
    • 2
  • a

    Adam Everington

    11/05/2021, 1:49 PM
    Secrets! Hey all, i'm struggling to understand where i'm going wrong. I have a prefect server setup and I have secrets in my local config.toml which contain things like username / pws and some keys. I want to access these during flow execution by doing something like: my_secret = PrefectSecret('my_secret_name') I keep getting error messages: • When I try to debug on local using a localexecutor() it says secrets should only be retrieved during a flow run, not while building a flow. I've currently got my secret retrieval in my flow definition, is this wrong?
    k
    a
    • 3
    • 61
  • m

    Martin Goldman

    11/05/2021, 2:49 PM
    Hi gang! Silly question here: does anybody use Prefect on a team of non-Python engineers where the task code won’t be Python? The centralized-workflow-but-distributed-execution model is what I’m looking for, and I was trying to fashion a janky way of bolting that onto another product before finding this offering.
    k
    • 2
    • 2
  • v

    Vamsi Reddy

    11/05/2021, 2:51 PM
    Hi Everyone, I tried to use the S3 storage for my flow. I am running into an error
    Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'utils\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
    I am able to run the flow locally but i get the error when i trigger it from the prefect cloud.
    k
    a
    • 3
    • 9
  • d

    Dominik Prester

    11/05/2021, 5:56 PM
    Hello everyone, I’ve managed to set up Prefect locally on k8s with k8s agent, with Dusk executor using KubeCluster cluster class. Is this the best configuration for utilising k8s cluster resources? Also, I can’t see task logs in UI when using this setup and I’m guessing this is due to workers being managed by dusk scheduler. Is there anything to do about this?
    k
    • 2
    • 2
  • m

    Marwan Sarieddine

    11/05/2021, 6:49 PM
    Hi folks - a question about prefect cloud task version locking
    a
    k
    a
    • 4
    • 16
  • v

    Vamsi Reddy

    11/05/2021, 9:12 PM
    ok so for some reason my dashboard on prefect cloud no longer shows me real time status of the tasks its running. although the flow is running on my local machine when triggered from cloud. I also tried clearing the cookies and tried to log back in
    ✅ 1
    n
    j
    • 3
    • 9
  • i

    itay livni

    11/06/2021, 3:03 PM
    Hi - A couple of questions (1) Where is the best place to ask
    orion
    questions/bugs? (2) Is there any flow visualization in
    orion
    ? I could not find anything in the docs. Thanks.
    a
    j
    • 3
    • 3
  • j

    Jack Sundberg

    11/06/2021, 4:15 PM
    I don't have any experience with GraphQL so I'm struggling to format my query. How would I get a count of running/submitted flow runs for a specific flow name? I'm a django orm user, so I know it would look this:
    nruns = FlowRuns.objects.filter(flow__name="MyExampleFlow", state__in=["Running", "Submitted"]).count()
    But for graphql... I'm kinda lost. And how would I format this in python for the
    client.query()
    method?
    k
    • 2
    • 6
  • i

    itay livni

    11/07/2021, 3:21 AM
    Hi - In orion is it possible to have tasks that are classes? i.e.
    MyClass{Task)...
    a
    • 2
    • 1
  • g

    Greg Adams

    11/07/2021, 6:10 PM
    Does anyone know if theres a tutorial to setup a docker agent to use Google Cloud Run for running flows?
    k
    s
    • 3
    • 10
  • b

    Bastian Röhrig

    11/08/2021, 10:26 AM
    Hey everyone, we have a docker agent running on a cloud vm for our production workloads. We need to automatically restart the docker agent on reboot. Is there a recommended way to make that happen? I stumbled across the https://docs.prefect.io/orchestration/agents/local.html#using-with-supervisor supervisor feature for local agent and am thinking about adapting this for our use case. However, I do not know enough of Supervisor to know if that would be the right approach.
    a
    e
    a
    • 4
    • 6
  • a

    ale

    11/08/2021, 11:13 AM
    Hey folks 😒imple_smile: We recently started using
    create_flow_run
    and
    wait_for_flow_run
    to replace the (legacy?)
    StartFlowRun
    . The FlowRun doc page warns about this feature being experimental and it may be subject to change in the future. Would it be possible to know what module parts can be considered stable and what parts are to be considered under review/refactoring? This would be super helpful for us to understand how to approach future upgrades of our Prefect Core deployment. cc @Kevin Kho @Michael Adkins @grandimk
    a
    • 2
    • 3
  • a

    Aqib Fayyaz

    11/08/2021, 11:38 AM
    1 contents = task(command='feast apply') 2 materialize = task(command='feast materialize 2021-11-08 2021-11-09') 3 getFeatures() These are the three tasks in my flow that i want to run but in sequence as given above, but the problem i think is its run the first task for feast apply and than it runs third task getFeatures() and after that it comes back to task 2 for materalize. I think it does not wait for second task to finish and starts the third as well.
    a
    • 2
    • 10
  • c

    Chris Arderne

    11/08/2021, 12:29 PM
    Documentation for
    upstream_tasks
    is a bit unclear (to me). The following code:
    @task()
    def task_a():
        ...
    
    @task()
    def task_b(param):
        ...
    
    @task()
    def task_c():
        ...
    
    with Flow("example") as flow:
        a = task_a()
        task_b(a)
        task_c(upstream_tasks=[task_b])
    produces a DAG as follows:
    task_a -> task_b
    task_b -> task_c
    But I want
    task_a -> task_b -> task_c
    , i.e.
    task_b
    must only run once, doesn't return anything, and must complete before
    task_c
    starts. To achieve this I've just assigned a fake return variable and passed that to
    upstream_tasks
    as follows:
    with Flow("example") as flow:
        a = task_a()
        b = task_b(a)
        task_c(upstream_tasks=[b])
    Is this the best way to do this?
    s
    a
    a
    • 4
    • 5
  • m

    Mike Lev

    11/08/2021, 12:39 PM
    is this relevant- can I override a serializer task param for numpy arrays? https://github.com/PrefectHQ/prefect/issues/2639#issuecomment-635002663
    a
    k
    • 3
    • 4
  • f

    Filip Lindvall

    11/08/2021, 1:32 PM
    We started getting a lot of "late runs", our k8s setup is running other jobs with no issue, we can manually start the same flows and they run. Only scheduled jobs become "late". No labels that block, universal runner too. Can I get some help here? @Kevin Kho?
    a
    z
    a
    • 4
    • 36
  • m

    Marko Herkaliuk

    11/08/2021, 4:58 PM
    Hi everyone! I want to make personalized notifications about the Flow. Currently, this is configured through Automations, all Failed statuses arrive in one channel in the slack. There is a desire that, depending on the flow, the message should be made mention of the person responsible for the Flow. I thought maybe I can somehow through the context to set the required variable (slack member id) and get it in the message like we get the flow-name or run-name of the Flow. However, I’m not sure that can be implemented within the current functionality. Creating webhooks at the level of each Flow seems like overhead.
    k
    • 2
    • 6
  • d

    Dotan Asselmann

    11/08/2021, 5:59 PM
    hey how do i enable --expose when using helm chart?
    a
    • 2
    • 6
  • b

    Braun Reyes

    11/08/2021, 7:29 PM
    Wanted to gauge any interest in the following enhancement to the ShellTask. We were thinking that there would be value in having the ability to access context like task_name, flow_name, etc.
    k
    • 2
    • 9
Powered by Linen
Title
b

Braun Reyes

11/08/2021, 7:29 PM
Wanted to gauge any interest in the following enhancement to the ShellTask. We were thinking that there would be value in having the ability to access context like task_name, flow_name, etc.
k

Kevin Kho

11/08/2021, 7:32 PM
Hey @Braun Reyes, I’m having a hard time picturing what you would do. Are you trying to use context info the in command of the ShellTask?
b

Braun Reyes

11/08/2021, 7:34 PM
so in dbt I can reference envars in my dbt_project.yml and other places. This means I could potentially make use of the underlying prefect task/flow metadata.
for us the use case was isolating dbt artifacts when you are running many dbt run commands in a single flow.
or you want to have prefect flow/task run data available in the artifacts themselves.
you can try an pass them in as envars directly in the current shell task, but does not look like they are lazily evaluated that way.
k

Kevin Kho

11/08/2021, 7:37 PM
Did you pass try passing them in the
run
method? I think that would lazily evaluate it
b

Braun Reyes

11/08/2021, 7:38 PM
oh...I did not
I will try that
thanks
View count: 2