https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Anze Kravanja

    10/01/2021, 10:00 PM
    Hi! This might have been raised before. I’m running the latest version (0.15.6) on a virtual machine. It works on localhost, but is not reachable via external IP. If I try the same with version 0.15.0 everything works as expected. Is this new behaviour and we need to do more to make it work like before? Thank you!
    k
    • 2
    • 3
  • t

    Tim Finkel

    10/01/2021, 10:35 PM
    I’m new to Prefect and have been attempting to run a flow using ECS. Unfortunately, my containers keep stopping immediately and the only info I see says 
    Essential container in task exited
     . I’ve been trying to debug this for a bit and am not sure how to proceed. Any advice?
    k
    a
    b
    • 4
    • 101
  • n

    Nikhil Acharya

    10/02/2021, 1:14 PM
    How do we pass multiple parameters in Flow, is below syntax good (second parameter is 2d array)
    flow.run(parameters=dict(studentId=student, data=data))
    k
    • 2
    • 1
  • t

    Tadej Svetina

    10/03/2021, 8:25 AM
    Hi, I am trying to set up task orchestration, where all the tasks will be AWS Batch jobs or similar (no actual processing happens within a task). What's the recommended configuration here (e.g. what agent to use, how to configure)? The setup should accommodate some concurrency (say up 10 flows run concurrently, and up to 40 tasks run concurrently), with minimum resource usage - as these should only be API calls.
    k
    • 2
    • 4
  • f

    Fina Silva-Santisteban

    10/04/2021, 3:10 AM
    Hi everyone! We’re using a flow-of-flows for some of our work and noticed that when a task inside one of the flows fails you only see a generic
    some reference tasks failed
    error message but you can’t see the stack trace the way you can when you run a ‘simple’ flow. Do you have any suggestions for how to implement error logging for flow-of-flows? I was thinking of using a state handler but those only know the ‘state’ of the flow as the name suggests and don’t have access to a stack trace. Curious to hear your thoughts! 🤔
    a
    k
    • 3
    • 4
  • k

    Klemen Strojan

    10/04/2021, 7:31 AM
    Hey all - Is there a way to make archived version of the flow active again? I have a flow that runs on a K8s agent, the image is stored in a container registry. I push new version to production and find a bug - I delete this new version. How do I now run the previous version of the flow, w/o creating a new Docker image?
    🙌 1
    h
    k
    e
    • 4
    • 13
  • s

    Suraj Mittal

    10/04/2021, 11:09 AM
    Hi. Can someone here clarify how does the prefect pricing work with mapped tasks. For example if I have a list of 5 elements And use that list as a parameter for a prefect mapped task. Does the flow run, count that as 5 task runs or 1 run for billing purposes?
    k
    • 2
    • 3
  • v

    Vikram Thirumalai

    10/04/2021, 2:43 PM
    Hi All, We are getting this error in Prefect on an inconsistent basis that looks like hte following below:
    botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the DescribeTasks operation (reached max retries: 4): Rate exceeded
    We tried updating Dask but we're not super sure how to proceed since we're still getting the error
    w
    k
    • 3
    • 6
  • f

    Filip Lindvall

    10/04/2021, 2:57 PM
    What is the easiest way to get notified from the prefect servers when flows fail, particularly on a high level. We have now twice been hit by flows silently failing on the prefect server side. Once because of a billing issue and once because of k8s connection issues. There should be a better way than looking for red on the dashboard?
    k
    t
    • 3
    • 4
  • a

    Adam Brusselback

    10/04/2021, 7:57 PM
    Hey everyone. I have a need to use the PostgresExecute task to run a specific function call, but the host/user/password that it runs for is determined dynamically during flow execution. It seems like this isn't possible due to how the task is setup
    k
    • 2
    • 10
  • a

    Andrew Hannigan

    10/04/2021, 8:53 PM
    In Prefect we can define tasks as functions with
    @task
    decorator, or as classes by subclassing
    Task
    . In general when programming in python and presented with these options, I typically reach for the subclassing option. I find I can think about problems at a higher level with OOP, easier to build up complex components from simpler ones, cleaner abstraction, etc. However I do find it’s a bit clunky with at times when building flows. Is it recommended to avoid the subclassing approach and use a functional approach instead when possible?
    k
    • 2
    • 5
  • m

    Matt Alhonte

    10/04/2021, 9:04 PM
    the new Structural Pattern Matching looks like it'll be pretty meaningful for Prefect! https://www.python.org/dev/peps/pep-0622/
    🚀 1
  • j

    Jacob Goldberg

    10/04/2021, 9:19 PM
    Is there a way to disable version incrementing when registering a flow with prefect cloud? (more details on use case in thread)
    k
    • 2
    • 12
  • c

    Chris Leber

    10/04/2021, 9:30 PM
    Hello friends! I am running into some issues using an ECSCluster with the DaskExecutor. A brief overview of my setup: using Prefect cloud tied in with AWS infrastructure (e.g. running prefect-agent as a service on an ECSCluster) A flow might look something like this:
    file_list = ['./file_1.txt', './file_2.txt', './file_3.txt', './file_4.txt', './file_5.txt', './file_6.txt', './file_7.txt', ..., './file_100.txt']
    
    @task
    def test_task(input_file):
        result = some_function(input_file)
        return result
    
    def ecs_cluster(n_workers=4):
        """Start a ecs cluster using the same image as the flow run"""
        return ECSCluster(
            n_workers=n_workers, image=prefect.context.image, region_name="us-east-1"
        )
    
    
    with Flow(
        "test flow",
        storage=S3(bucket="storage"),
        run_config=ECSRun(task_definition=task_definition),
    ) as flow:
        test_task.map(input_file=file_list)
    
    flow.executor = DaskExecutor(
        cluster_class=ecs_cluster, cluster_kwargs={"n_workers": 10}
    )
    k
    a
    • 3
    • 14
  • i

    Ismail Cenik

    10/04/2021, 11:11 PM
    Hello, When I register a flow, it becomes automatically scheduled. Then I need to set manually scheduled off. Is there any config to make it by default false? There was no such a problem couple of months ago.
    k
    • 2
    • 1
  • m

    Matt Alhonte

    10/05/2021, 1:07 AM
    Trying to do a Flow of Flows - the outer flow starts running, but the first of the inner flows fails with
    Unexpected error: ClientError([{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}])
    (we're on 0.13.19). Anyone know what might be up?
    k
    • 2
    • 8
  • c

    chicago-joe

    10/05/2021, 2:03 AM
    Hey guys, I'm wondering if anyone has had any experience using Prefect Tasks within a class object? Essentially I'm working on a MySQL Class object that will handle authenticate, open/close connections, and queries. The protected "auth" class is using a prefect task to load the config from AWS Secrets Manager. It all works fine until I try to use the class within a flow, then I run into errors. Any help would be greatly appreciated, cheers! https://gist.github.com/chicago-joe/98c165a596c6eb12337a07073f8e9b4d
    k
    a
    • 3
    • 6
  • c

    Chris White

    10/05/2021, 2:32 AM
    https://orion-docs.prefect.io/ 👀
    🎉 4
    :party-parrot: 5
    👀 15
    ❤️ 1
    :marvin: 5
    j
    b
    a
    • 4
    • 6
  • b

    Bastian Röhrig

    10/05/2021, 8:03 AM
    Hey everyone, I'm having a weird issue that looks like a bug to me with child flows and custom reference tasks. More in thread.
    a
    k
    • 3
    • 11
  • w

    Wilson Bilkovich

    10/05/2021, 1:30 PM
    Hi folks. Does anyone know how I generate a
    Dask Performance Report
    of a Prefect Flow that is using
    DaskExecutor
    i.e. https://distributed.dask.org/en/latest/api.html#distributed.performance_report It looks like the
    dask.distributed.performance_report()
    needs to run inside the context of the temporary Dask Cluster that Prefect spins up, but I don’t see how to get into those internals.
    k
    b
    • 3
    • 11
  • k

    Kevin Weiler

    10/05/2021, 1:56 PM
    Hi there - I’m trying to write a state handler for a task that submits work to a service via a rest api. The state_handler code is:
    def nomad_batch_node_cancel_handler(task: Task, old_state: State, new_state: State):
            if (new_state == Cancelled) & (old_state == Running):
                <http://_LOGGER.info|_LOGGER.info>(f"killing job {nomad_job_name}")
                Nomad().stop_job(job_id=nomad_job_name)
    It gets
    nomad_job_name
    from an enclosing function (it’s a closure). When I click the “cancel” button in the UI - I don’t think this handler is getting executed - my log message isn’t there, and the task is not killed. Is there something wrong with how I’m testing for state change? It seems to me that the task is in a
    Running
    state when I click cancel, and the UI seems to think it goes to a
    Cancelled
    state thereafter.
    k
    • 2
    • 17
  • v

    Vincent

    10/05/2021, 2:12 PM
    Hello all. I recently bumped into this issue for my flow. Has anyone seen anything similar?
    Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 929b8b8d-b75c-4a7b-b0ff-e0433a903fee: provided a running state but associated flow run 3775f8a8-6d8c-452b-a69f-e51eb6bc07e7 is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/opt/conda/envs/dev/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/opt/conda/envs/dev/lib/python3.8/site-packages/prefect/client/client.py", line 1917, in set_task_run_state
        result = self.graphql(
      File "/opt/conda/envs/dev/lib/python3.8/site-packages/prefect/client/client.py", line 569, in graphql
        raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 929b8b8d-b75c-4a7b-b0ff-e0433a903fee: provided a running state but associated flow run 3775f8a8-6d8c-452b-a69f-e51eb6bc07e7 is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    k
    • 2
    • 15
  • t

    Tony Yun

    10/05/2021, 3:29 PM
    Hi! I’m having some trouble in getting proper logs from 
    RunNamespacedJob
     task. I had set the 
    log_level='info'
     option, but when it’s failed, no any logs are being sent to Prefect. Instead, I only get this from UI.
    FAIL signal raised: FAIL('Job dbt-run-from-flow failed, check Kubernetes pod logs for more information.')
    After going to the k8s pod, I see exception logs:
    dbt ls --models tm_snowflake.* --profiles-dir=. --profile default --target dev
    Encountered an error while reading profiles:
      ERROR Runtime Error
      Compilation Error
        Could not render {{ env_var('DBT_PASSWORD') }}: Env var required but not provided: 'DBT_PASSWORD'
    Encountered an error:
    Runtime Error
      Could not run dbt
    make: *** [Makefile:6: ls] Error 2
    How could I pass any logs from k8s pods to Prefect UI?
    k
    • 2
    • 22
  • k

    kiran

    10/05/2021, 5:07 PM
    Hi y’all. Has anyone gotten
    flow.visualize()
    to run on a Linux server (running the code, which is on the server, from my Mac)? I’m having issues with
    xdg-open
    and also
    gio
    and have now gone down several google/stack overflow rabbit holes with only minor successes
    k
    • 2
    • 1
  • e

    ek

    10/05/2021, 6:00 PM
    Hello everyone, I'm trying to figure out the code below:
    import my_lib
    from prefect import task, Flow
    from prefect.storage import S3
    
    @task
    def my_func():
        my_lib.func()
    
    with Flow("myflow") as flow:
       flow.storage = S3(
            bucket="BUCKET_NAME",
            stored_as_script="true",
            local_script_path="main.py"
        )
    flow.register('myflow')
    my dir looks like this:
    .
    ├── main.py
    └── my_lib
        ├── __init__.py
        └── db.py
    I'm trying to push my code up to s3 bucket but my
    my_lib
    dir doesn't get package up along with
    main.py
    what am I missing here?
    k
    • 2
    • 6
  • k

    Kevin Weiler

    10/05/2021, 6:38 PM
    is there any way for a task to know that its flow has been triggered by a Schedule vs. the API? Something in the context perhaps?
    👀 1
    n
    k
    • 3
    • 5
  • k

    Ken Nguyen

    10/05/2021, 7:04 PM
    Are we able to set a more specific time when setting the schedule to be once a day on the front end? For example, I'm trying to set my flow to run once a day at 7:32AM
    z
    • 2
    • 2
  • b

    Bob Colner

    10/05/2021, 7:34 PM
    Just now skimming the ‘Orion’ blog post -am I reading this correctly that all existing prefect core workflow will not be supported ~1 year from now?
    j
    • 2
    • 10
  • a

    Adam Brusselback

    10/05/2021, 9:16 PM
    Hey again... so lets say I have prefect server with two worker localagents that connect to it that will have the same flows / tasks deployed manually through ansible. I must have misconfigured something, because it looks like when both agents are up and I run a job, it gets submitted to both agents
    k
    • 2
    • 7
  • f

    Frederick Thomas

    10/05/2021, 9:52 PM
    Hi All, I hope everyone is okay... Anyhow, my question is that we currently have Prefect set up in an Azure VM running Ubuntu 18.04 which is pushing the limit on disk usage. I've :
    sudo du -h /mnt/data/prefect/ > log.txt
    and after reading the file found this:
    217G	/mnt/data/prefect/.prefect/results
    My question being is it safe to delete the results in the folder, and if not can they be stored elsewhere safely? Thanks!
    k
    • 2
    • 5
Powered by Linen
Title
f

Frederick Thomas

10/05/2021, 9:52 PM
Hi All, I hope everyone is okay... Anyhow, my question is that we currently have Prefect set up in an Azure VM running Ubuntu 18.04 which is pushing the limit on disk usage. I've :
sudo du -h /mnt/data/prefect/ > log.txt
and after reading the file found this:
217G	/mnt/data/prefect/.prefect/results
My question being is it safe to delete the results in the folder, and if not can they be stored elsewhere safely? Thanks!
k

Kevin Kho

10/05/2021, 9:56 PM
You can delete them if you don’t need them yep.
You can also turn off checkpointing, the downside is you won’t be able to restart work from failure.
f

Frederick Thomas

10/05/2021, 10:00 PM
Documentation on checkpointing in case they're dumb enough to turn it off, s'il vous plaît?
k

Kevin Kho

10/06/2021, 12:33 AM
@task(checkpoint=False)
def ...
More info here
View count: 3