https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Martin T

    05/04/2022, 6:08 AM
    Debugging Prefect Flows with breakpoints. Hello friends. I love coding Python in VSCode, the debugging with breakpoints make me so efficient. But the debugging fun stops with Prefect Flows, when we initialize them from CLI:
    $ prefect run --path myflow.py --log-level INFO --param myparam=val
    Here the development flow becomes rather blackboxed/trial-and-error. Is it possible to somehow set breakpoints in VSCode, start a prefect flow from CLI, and attach the debugger to the python process? (Running with the LocalExecutor)
    :discourse: 1
    k
    • 2
    • 9
  • j

    Jelle Vegter

    05/04/2022, 11:52 AM
    Hi all, I’m trying to set a scheduled dependency between flows so that flow B starts when flow A finishes (after being scheduled by time). I found this: https://docs.prefect.io/core/idioms/flow-to-flow.html but it’s not very clear to me how I can implement it.
    a
    • 2
    • 1
  • a

    Abuzar Shaikh

    05/04/2022, 12:06 PM
    Hey Guys I want to setup a flow dependency for my task, basically I am running a flow, inside which I have a flow which downloads some data and then after this data has been downloaded I want to run some tasks, but when I try to do so the download flow runs successfully but the tasks after that end up failing, any idea why could that be happening and how can I fix it?
    a
    • 2
    • 3
  • i

    Ievgenii Martynenko

    05/04/2022, 12:45 PM
    Hi, I'm looking through https://github.com/PrefectHQ/prefect/discussions/4042 to define the most optimal deployment approach. We have a single Agent acting more as an orchestrator and 1000 Dataflows that are using DockerRun() and S3 Storage. In order to set up CI\CD we have to : 1. Copy the whole project to the Agent host and register all flows by means of some custom Python scripts (which we have to write). Dataflows will be saved on S3 at this moment. 2. Ensure that the image used in DockerRun() has all requirements/env variables installed and set. 3. Execute DataFlow from UI, which will go to S3 to pick flow, run new Docker container from the predefined image, and execute flow on that image. I'm bit confused here that copy of all DataFlows will sit on Agent and in fact Agent is not a worker. Can this approach be simplified?
    a
    m
    • 3
    • 33
  • m

    Michail Melonas

    05/04/2022, 12:45 PM
    I’ve have a scheduled flow that is failing due to a credential issue:
    Error downloading Flow from S3: An error occurred (ExpiredToken) when calling the GetObject operation: The provided token has expired.
    I’m using AWS SSO credentials to upload the flow to S3:
    flow.storage = S3(
        bucket=Config.PREFECT_S3_BUCKET,
        client_options={
            "aws_access_key_id": Config.AWS_ACCESS_KEY_ID,
            "aws_secret_access_key": Config.AWS_SECRET_ACCESS_KEY,
            "aws_session_token": Config.AWS_SESSION_TOKEN,
        },
    )
    Initially, the flow runs. However, flows scheduled after the token expiry fail with the above error. How do I both use SSO to upload flows to S3 and have them scheduled on Prefect Cloud?
    a
    p
    • 3
    • 15
  • p

    Patrick Tan

    05/04/2022, 1:47 PM
    Hi, Do you have example of building flow to AWS S3 with credentials like aws token? The example in doc does not have credentials
    from prefect import Flow
    from prefect.storage import S3
    
    flow = Flow("s3-flow", storage=S3(bucket="<my-bucket>"))
    k
    a
    • 3
    • 5
  • b

    Birkir Björnsson

    05/04/2022, 2:33 PM
    Hi I am new to Prefect but over last few days I have managed to create a few flows and executed them from Prefect Cloud and Local Agents. Now I have been playing around with using docker containers for the agents and scripts are on the containers. That is working when the container is running locally. So next I decided to try to build a docker and run it with GCP Cloud Run and CI/CD pipeline with Github Actions. That builds but failes in deployment and I get this error: Cloud Run error: The user-provided container failed to start and listen on the port defined provided by the PORT=8080 environment variable. Logs for this revision might contain more information. Logs URL: https://console.cloud.google.com/logs/viewer?project=dkpos-dev&amp;resource=cloud_run_revision/service_name/dkpos-dev/revision_name/dkpos-dev-00002-dam&amp;advancedFilter=resource.type%3D%22cloud_run_revision%22%0Aresource.labels.service_name%3D%22dkpos-dev%22%0Aresource.labels.revision_name%3D%22dkpos-dev-00002-dam%22 For more troubleshooting guidance, see https://cloud.google.com/run/docs/troubleshooting#container-failed-to-start Has anyone managed to get this running in GCP Cloud run? Any hints are most welcomed and I have no idea how to solve this port issue. I am of course open for other ways to deploy and run my projects in containers. Basically looking for the simplest solution if possible 😄 Thanks in advance, Birkir
    k
    • 2
    • 2
  • j

    Jason

    05/04/2022, 2:50 PM
    When using classes with @tasks, I'm assuming that the class needs to be instantiated each time for each task? For example:
    @task
    def task_a():
        common_class = Common(Secret())
        output = common_class.task_a()
        return output
    
    @task
    def task_b():
        common_class = Common(Secret())
        output = common_class.task_b()
        return output
    k
    a
    • 3
    • 4
  • j

    Jonathan Mathews

    05/04/2022, 3:43 PM
    Hi, I’m trying to use the Snowflake Task to execute a “COPY INTO” query. It works fine with SELECT queries, but it just isn’t running the query when I switch it to a copy into and the task is coming back with TriggerFailed
    k
    a
    • 3
    • 18
  • m

    Michelle Brochmann

    05/04/2022, 4:23 PM
    Newbie question here - why can’t a single agent take care of various type of run deployments (Docker, Local, etc)?
    k
    • 2
    • 3
  • m

    Michelle Brochmann

    05/04/2022, 4:56 PM
    I am trying to run two versions of a Flow on a Docker container using Docker Storage - the only (intended) difference is that one is built from tasks created via the @task decorator, and the other is built from tasks subclassed from the Task object. The decorator version runs fine, but the subclassed version results in this error when the Docker container is being built:
    /opt/prefect/healthcheck.py:130: UserWarning: Flow uses module which is not importable. Refer to documentation on how to import custom modules <https://docs.prefect.io/api/latest/storage.html#docker>
    flows = cloudpickle_deserialization_check(flow_file_paths)
    Is there an obvious reason why this would happen? My tasks are defined in a module that I’m importing into the file where the Flows are defined. I can provide more info if needed!
    k
    • 2
    • 27
  • j

    Jonathan Mathews

    05/04/2022, 5:07 PM
    Hi, what’s the correct way to use EnvVarSecret within a Flow, currently my code is below, but it tells me the task has not been added to the flow. If I don’t call it with
    .run()
    it complains that it is not JSON serializable:
    with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
        snowflake_password=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True).run()
    ....
        upload_results = SnowflakeQuery(
            user=SNOWFLAKE_USER,
            password=snowflake_password,
            account=SNOWFLAKE_ACCOUNT,
            warehouse=SNOWFLAKE_WAREHOUSE,
            database=SNOWFLAKE_DATABASE,
            role=SNOWFLAKE_ROLE,
            schema=SNOWFLAKE_SCHEMA).map(query=queries)
    k
    • 2
    • 8
  • l

    Lon Nix

    05/04/2022, 6:32 PM
    I noticed that Prefect is a CNCF member but I didn't see it anywhere as a product in the Landscape. Did I miss something?
    k
    a
    • 3
    • 17
  • z

    Zach Schumacher

    05/04/2022, 6:37 PM
    is there any mechanism in prefect for retrying a flow? I know i can set retries on tasks, but sometimes weird things happen with the k8s autoscaler and I am unable to assign a pod. In these cases, I want to backoff retry the entire flow, but I'm not finding where I would configure that. It never actually starts a task, so defining task retries doesn't help me here. I've tried defining retries specifically on the k8s job template in the past, but I've found that the prefect job will sometimes get confused on the state of the flow as its not directly managing the retries.
    k
    m
    • 3
    • 4
  • c

    Chris Reuter

    05/04/2022, 6:50 PM
    Hey all! @justabill, @Charleigh Liang and myself are going live in 10 minutes to chat about the history of dataflow automation. If you like reminiscing about old architecture decisions then this will be the stream for you! See you there on PrefectLive!
  • j

    Jonathan Mathews

    05/04/2022, 8:33 PM
    Hi again! If I’m using a prefect task (such as SnowflakeQuery or dbtshell) and want to set custom logging, how do I do that. For example, at the moment I have “Task ’SnowflakeQuery[52] Starting Run” …. “Task ’SnowflakeQuery[53] Starting Run” … “Task ’SnowflakeQuery[54] Starting Run” and instead I want to have “File load query file1.txt” ….. “File load query file2.text”. Is that possible? Can I just pass in names and mappings via the name task_args? Also, how do I set retries? Do I pass task_args too?
    k
    • 2
    • 5
  • t

    Tabari Brannon

    05/04/2022, 8:37 PM
    Newbie question - I am getting this error when I try to register my flow.
    File "C:\etls\tractor_hours_analyzer\tractor_hours_analyzer.py", line 89, in <module>
        transformed = transform_add_files_to_dataframe(extracted)
        raise TypeError('too many positional arguments') from None
        TypeError: too many positional argumentswith
    k
    • 2
    • 12
  • r

    Raviraja Ganta

    05/04/2022, 8:51 PM
    I am trying to use Github Actions for assigning storage to flows and registering them. Does it needs to know all the packages while doing that also?
    a
    • 2
    • 12
  • s

    Sean Leakehe

    05/04/2022, 11:01 PM
    Is it possible to set a Flow parameter’s type in code? When I go into one of our parametrized flows in Prefect Cloud, each parameter has a Type dropdown. I’d like to explicitly set each of our Parameters to one one the types from the dropdown.
    k
    • 2
    • 3
  • b

    Baris Cekic

    05/04/2022, 11:10 PM
    Is it possible to create inline deployment instead of using CLI ? I am trying to run the Kubernetes tutorial, all looks good, but in my case I need to dynamically create flow, tasks, flows and deployment and want to create inline deployment. Is there any code example? PS : Question is for Orion.
    k
    • 2
    • 1
  • b

    Baris Cekic

    05/04/2022, 11:12 PM
    Second question is about storage? When will the git storage available in Orion? I am trying to use s3 storage but it uses bucket root directory for deployments, I can not provide a sub folder in bucket.
    k
    • 2
    • 1
  • b

    Baris Cekic

    05/04/2022, 11:15 PM
    Third question is about kubernetes orion installation. Everytime I started my docker/kubernetes environment, services become active but I have to reconfigure storage, create work queues and all delpyment/flows are gone in UI. Isn’t the data or the POD disk persisted ?
    k
    a
    • 3
    • 5
  • e

    Edmondo Porcu

    05/05/2022, 1:20 AM
    Still coming to Databricks and I guess in general with Tasks. Databricks has added git support for jobs, and the current DatabricksSubmitMultitaskRun doesn't support it. I am in doubt among the possible approaches: • Create a DatabricksSubmitMultitaskRun custom implementation. However, I will access underscore methods in prefect package • Use the databricks CLI python library to create a job and then just run it via Prefect • Other? The real problem is that the Task does not allow dependency injection (i.e. the databricks client is created within the Run function, so it's not easy to override it). I guess the design of the Task is concerning in the sense that is not extensible, one needs to rewrite it from scratch
    k
    a
    • 3
    • 20
  • b

    Ben Ayers-Glassey

    05/05/2022, 3:40 AM
    In old-school Prefect (not Orion), is there a workaround for adding build args (i.e. Dockerfile's ARG instead of ENV)? Currently Prefect's
    Docker
    class takes an
    env_vars
    dict, but no
    build_args
    dict, which would be nice. Here is the place in the
    Docker
    class where ENVs are generated: https://github.com/PrefectHQ/prefect/blob/8e04ccad071d1127afc7ca3580f1fe6f4e884f27/src/prefect/storage/docker.py#L437-L442 ...the ARGs could go right above there. In any case, ultimately what I'm trying to do is get the
    Docker
    class to install
    python_dependencies
    from our private PyPi server. Is there a good way to do that?
    k
    m
    • 3
    • 55
  • b

    Ben Ayers-Glassey

    05/05/2022, 5:27 AM
    Thread about loading flows from modules instead of .py files or cloudpickle-serialized blobs =>
    a
    • 2
    • 5
  • r

    Raviraja Ganta

    05/05/2022, 5:50 AM
    I have registered the flows via GitHub Actions and the agent is there. When I run the flow, it looks like it got stuck. Can someone help me on debugging this? or what is happening here?
    a
    • 2
    • 2
  • b

    Bernard Greyling

    05/05/2022, 6:17 AM
    Morning All We are using prefect via a kubernetes agent. We've noticed that failed flows don't clean up prefect jobs and in some cases the dask-excecutor. Is there any passive mechanism to clean up these dead jobs ?
    etl                prefect-job-29a6d24a-hwnzx                         0/1     ImagePullBackOff             0          6d20h
    etl                prefect-job-30bd9ad7-zsc6z                         0/1     CreateContainerConfigError   0          11h
    etl                prefect-job-39468921-lt95t                         0/1     CreateContainerConfigError   0          6d20h
    etl                prefect-job-3c1aee81-hvbv4                         0/1     Completed                    0          7h49m
    etl                prefect-job-3c84d152-wfqw9                         0/1     Completed                    0          7h53m
    etl                prefect-job-3fb2397e-7dc5b                         0/1     ImagePullBackOff             0          11h
    etl                prefect-job-5ba165eb-rfxhq                         0/1     CreateContainerConfigError   0          6d20h
    etl                prefect-job-62ae93a3-ptbcr                         0/1     CreateContainerConfigError   0          6d20h
    etl                prefect-job-6cfd964c-p7xpl                         0/1     ImagePullBackOff             0          6d20h
    etl                prefect-job-86e07407-nmpwb                         0/1     ImagePullBackOff             0          6d20h
    etl                prefect-job-9478c771-bfg26                         0/1     ImagePullBackOff             0          11h
    etl                prefect-job-950ba45b-tv4f2                         0/1     CreateContainerConfigError   0          11h
    etl                prefect-job-a199b1d6-6vwmd                         0/1     Completed                    0          7h49m
    etl                prefect-job-a4976ce3-gxqv6                         0/1     Completed                    0          6h5m
    a
    m
    m
    • 4
    • 7
  • j

    Julian Brendel

    05/05/2022, 7:16 AM
    Good morning prefect community 😊. We are evaluating a switch to either Prefect or Dagster. On the surface the feature sets look quite similar. Do you have any resources aiding with the decision which solution to choose?
    👋 1
    a
    • 2
    • 9
  • i

    Ievgenii Martynenko

    05/05/2022, 7:48 AM
    Morning, we've set up new Prefect env in K8s, but some old dataflow have DockerRun() as run_config. These dataflows are failing with error: "Flow run 46a30aa1-11f0-4c77-bb56-afab879db8bc has a
    run_config
    of type
    DockerRun
    , only
    KubernetesRun
    is supported" Suppose that Prefect knows it runs in K8s and now all dataflow should use K8s agent, right?
    a
    • 2
    • 2
  • m

    Mini Khosla

    05/05/2022, 8:51 AM
    Hi All We are using prefect server 0.15.7 and running the flows on yarn Cluster. I have 20 mapped tasks and each mapped task loops over a list , does some processing and saves output to hadoop. I am getting the below error intermittently. Any help would be appreciated.
    a
    k
    • 3
    • 3
Powered by Linen
Title
m

Mini Khosla

05/05/2022, 8:51 AM
Hi All We are using prefect server 0.15.7 and running the flows on yarn Cluster. I have 20 mapped tasks and each mapped task loops over a list , does some processing and saves output to hadoop. I am getting the below error intermittently. Any help would be appreciated.
a

Anna Geller

05/05/2022, 10:28 AM
Are you running Dask on a Hadoop cluster? The error says something about unmanaged history and it seems like a Dask issue. Can you share the flow code for this flow (mainly interested in flow structure and configuration of Flow object, not what tasks are doing)? What agent did you use?
k

Kevin Kho

05/05/2022, 1:49 PM
The KilledWorker indicates you have some kind of package mismatch between Client/Scheduler/Workers
:upvote: 1
View count: 9