https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    Thomas Fredriksen

    09/20/2021, 12:29 PM
    Hello everyone. I trying to figure out what is considered best practice for differential dataloads and backfills? I am trying to obtain the timestamp of the last successful run of a flow in order to obtain data added or changed from last run. Using the prefect
    Client.get_flow_run_info
    -function seems to be one way of achieving this, but it may not scale as number of runs for a flow grows large.
    k
    • 2
    • 12
  • g

    Gareth Dwyer

    09/20/2021, 12:55 PM
    Maybe a stupid question, but can anyone tell me what the Blue/Green colouring means on the DAG overview diagram?
    g
    n
    • 3
    • 4
  • a

    Anatoly Alekseev

    09/20/2021, 2:16 PM
    Hi ) How do I make sure a Prefect flow which is scheduled to run each 10 mins is NOT run if the previous run is still in progress?
    k
    m
    m
    • 4
    • 6
  • g

    García Gimenez Ignacio - 6to 2da

    09/20/2021, 2:39 PM
    Hi Guys! Im getting this error when using the UI. Im running a server and an agent in a docker container. {"type":"http-log","timestamp":"2021-09-20T14:38:29.787+0000","level":"error","detail":{"operation":{"user_vars":{"x-hasura-role":"admin"},"error":{"path":"$.selectionSet.flow_run.args.where.flow.project_id._eq","error":"unexpected null value for type \"uuid\"","code":"validation-failed"},"request_id":"0b7c4c08-10ce-4782-9b6b-4a2e3ee547bf","response_size":142,"request_mode":"error"},"request_id":"0b7c4c08-10ce-4782-9b6b-4a2e3ee547bf","http_info":{"status":400,"http_version":"HTTP/1.1","url":"/v1alpha1/graphql","ip":"172.21.0.6","method":"POST","content_encoding":null}}} Any suggestion?
    k
    • 2
    • 8
  • m

    Martin

    09/20/2021, 9:59 PM
    Hello! When using ResourceManager, is there a way to make sure that the cleanup method is called when the flow is cancelled from the UI? From some testing I noticed that cleanup wasn't called before the flow was cancelled.
    k
    • 2
    • 6
  • s

    Shaoyi Zhang

    09/21/2021, 12:10 AM
    Hi! I’m trying to leverage Kubernetes for Prefect. Our K8s agent is up and is receiving flow runs properly. However, the K8s job creation is failing due to
    Error creating: pods "prefect-job-138dbd9d-v6rq5" is forbidden: failed quota: xxxxx-request-quota: must specify requests.cpu,requests.memory
    although I specified the request limit with
    env:
    .....
    - name: JOB_MEM_REQUEST
      value: "2048Mi"
    - name: JOB_MEM_LIMIT
      value: "4096Mi"
    - name: JOB_CPU_REQUEST
      value: "100m"
    - name: JOB_CPU_LIMIT
      value: "200m"
    This is the Prefect source code if I understand correctly and I’m using
    prefect:0.15.5-python3.6
    . Any suggestions?
    k
    • 2
    • 5
  • a

    Anh Nguyen

    09/21/2021, 2:11 AM
    Hi! The first ETL flow is success. But the next scheduled dont run. Please help me
    k
    • 2
    • 6
  • l

    Lucas Beck

    09/21/2021, 8:07 AM
    Hi everyone, I have a flow that spins up
    n
    different jobs in a k8s cluster to perform some computationally heavy tasks. Sometimes when I cancel the flow or it fails for whatever reason, I then need to manually clean up the
    n
    jobs in the k8s cluster. To avoid this manual work, I am trying to create a state handler to deal with that. The handler works for the failed state, but does not with cancelling. I have tried both
    Cancelling
    and
    Cancelled
    . For ilustration purpouses, there is an example below:
    import os
    import uuid
    import time
    import prefect
    from prefect import Task, Parameter
    from prefect.executors.dask import LocalDaskExecutor
    from prefect.utilities.edges import unmapped
    
    
    def clean_up(task, old_state, new_state):
        logger = prefect.context.get("logger")
        task_full_name = prefect.context.get("task_full_name")
        if isinstance(new_state, prefect.engine.state.Failed):
            <http://logger.info|logger.info>(
                f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
            )
        elif isinstance(new_state, prefect.engine.state.Cancelled):
            <http://logger.info|logger.info>(
                f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
            )
        elif isinstance(new_state, prefect.engine.state.Cancelling):
            <http://logger.info|logger.info>(
                f"cleaning up job {task.run_id}-{task_full_name} due to new state being {new_state}"
            )
    
    
    class DoSomeComputation(Task):
        def __init__(self, **kwargs):
            self.run_id = str(uuid.uuid4())[:8]
            super().__init__(**kwargs)
    
        def run(self, input, fail=True):
            logger = prefect.context.get("logger")
            <http://logger.info|logger.info>(f"Started some computation task with input {input}")
            for i in range(0, 200):
                time.sleep(1)
            if fail:
                raise Exception("Something terrible happened")
            <http://logger.info|logger.info>("Sucessful completed some computation task")
    
    
    if __name__ == "__main__":
        project_name = "test-flows"
        flow_name = "test-cleanup"
        with Flow(...) as flow:
            fail = Parameter("fail")
            do_some_comp = DoSomeComputation(
                state_handlers=[clean_up], task_run_name="{task_name}-job-name={input}"
            )
            inputs = ["input1", "input2", "input3"]
            do_some_comp.map(inputs, unmapped(fail))
    
        flow.executor = LocalDaskExecutor(num_workers=20)
        flow.register(project_name=project_name)
    The "cleaning up job..." log never gets triggered when I cancel the job. Anyone has an ideia or also experienced this? PS: I am using the
    KubernetesRun
    flow config
    a
    k
    • 3
    • 10
  • a

    Andrey Nikonov

    09/21/2021, 9:05 AM
    Hello everyone! How do I get task timeout value from GraphQL? When I query interactive API I get an error. My request:
    query DMS{
      flow (
        where: {
          name: {_eq: "Daily metrics summary"}, archived: {_eq: false}
        }
      ) { 
          id, version,
          tasks {
            id, name,
            task_runs(order_by: {start_time: desc}, limit: 1){
              start_time, end_time
            },
          # Here I'm getting error (see below)
          timeout
        }}
    }
    k
    • 2
    • 10
  • r

    Ruslan Aliev

    09/21/2021, 12:25 PM
    Hi, folks. I need to run “create_container” after successfully finished any task in ifelse statement. How to implement upstream dependency for create_container and ifelse statement?
    @task(name="Download video", log_stdout=True)
    def download_video(task_params, destination_dir='./data'):
        remote_file_path = f"{task_params['storage']['source']}{task_params['video']['filepath']}"
        destination_dir = Path(destination_dir)
        destination_dir.mkdir(exist_ok=True)
        destination_path = f"{destination_dir / Path(remote_file_path).name}"
        file_download(remote_file_path, destination_path)
    
    
    @task(name="Fetch previous result", log_stdout=True)
    def fetch_prev_result(task):
        prev_result = task['prev_result']
        db_name = prev_result['db_name']
        collection_name = prev_result['collection_name']
        out_path = f"{db_name}_{collection_name}.csv"
        mongo_fetch(mongo_client, db_name, collection_name, out_path=out_path)
    
    
    is_detection = check_equal(task_type, 1)
    ifelse(is_detection, download_video(task_params, destination_dir='./videos'), fetch_prev_result(task))
    command = '''python -c "from prefect import Flow; f = Flow('empty'); f.run()"'''
    container_name = StringFormatter(name="Container name", template="scenario_{scenario_id}_task_{task_id}")
    container_id = create_container(image_name=image_name,
                                        container_name=container_name(scenario_id=scenario_id, task_id=task_id),
                                        command=command).set_upstream([download_video, fetch_prev_result])
    started = start_container(container_id=container_id)
    k
    • 2
    • 6
  • m

    maz

    09/21/2021, 4:52 PM
    Hi, all. I am new to Prefect and trying to create a work flow with following need: 1. Get list of files from S3 2. Iterate the files and transpose the values in each file and dump to a db. I have following python functions: • function 1-> get S3 object • function 2-> read the S3 file content and transpose the value • function 3 -> write the transposed values to db What I am looking is how to link them in a flow so that function 1, 2, and 3 are executed for all the files I have on S3. How would I do it using Prefect? Thanks, Maz
    ✅ 1
    k
    • 2
    • 5
  • m

    Max Kureykin

    09/21/2021, 5:13 PM
    Hi everyone! Is it possible to pass SQLAlchemy PG session through tasks? I've tried to create, but faced the pickling problems. Is there any predefined way to handle such stuff(sessions/temp objects)?
    k
    • 2
    • 7
  • d

    Daniel Saxton

    09/21/2021, 5:17 PM
    hi all, does Prefect Cloud have a way to view logs from flows, or is it recommended to integrate with another service for consuming log data?
    k
    • 2
    • 3
  • d

    Danny Vilela

    09/21/2021, 5:51 PM
    Hi all! I have a state handler for alerting me when a
    Task
    is retrying. It tells me which
    Task
    failed, when it’ll retry, how many times this task has retried, etc. I think I’d really rather just have that same state handler on the top-level
    Flow
    to avoid setting that state handler on each task. I noticed the signatures for
    Task
    state handlers are different than those for the
    Flow
    state handlers — is there an easy way to have a
    Flow
    state handler that can access Task-level information? For example, given a
    Flow
    that’s changing states, can we readily access the specific
    Task
    in question?
    k
    • 2
    • 5
  • h

    Hui Zheng

    09/22/2021, 12:43 AM
    Hello, recently, after I switched to M1 macbook, I found my prefect flow docker build and registry often frozen at this step below
    - prefect.Docker | Pushing image to the registry...
    Does it happen to other people?
    k
    a
    z
    • 4
    • 22
  • a

    Abhas P

    09/22/2021, 1:10 AM
    Hi ! Background : I was beginning to write a few basic tests for my flows which basically extract data from db, transform and load data back into DB. Intention : I want to test my custom flow in term of : composition, serialization and tasks in terms of : high level structuring, input/output, mapping. Is there another place or point of reference for unit testing(think of CI) real time custom flows apart from link in this page (this places essentially points out how prefect unit tests its own platform if I am not wrong).
    k
    b
    m
    • 4
    • 12
  • m

    Max Kureykin

    09/22/2021, 9:05 AM
    Hi guys! The question is about prefect/dask combo. Is there any way to limit dask scheduler not to push so many tasks on workers? In prefect cloud I've found Task Concurrency feature, but I believe this stuff will not work with dask. Thx)
    k
    • 2
    • 3
  • n

    Nadav Nuni

    09/22/2021, 1:21 PM
    Hey I might have missed something in the docs, so I’ll ask here: I have a kubernetesAgent which runs all my tasks, and we want to make stuff parallel. is it ok to use the mapping features (here) with a
    LocalExecutor
    ? or is there something else I need to do? the docs here state that I need to a
    DaskExecutor
    for mapping to work…but I’m not sure which mapping, and I’m also not sure I understand why…
    k
    • 2
    • 2
  • s

    Svyat

    09/22/2021, 3:14 PM
    Hi all, Is there a documented recipe for deploying a Prefect flow on AWS ECS EC2? Specifically looking for a walkthrough for
    run_config
    parameters in
    ECSRun
    class
    k
    • 2
    • 4
  • l

    Lukáš Pravda

    09/22/2021, 4:22 PM
    Hi all, I’m trying to get auto documentation using Sphinx from docstrings. The code base uses a lot of task decorators. I’ve done some research, but all those answers are ~ a year old, so not sure if things have not developed further. Ideally. I’d like to have a nice sphinx autogenerated documentation using `automodule`directive. Based on what I’ve found here and here. I can now get nice documentation, provided I define a function manually using
    autofunction
    directive (tedious and not ideal). I can achieve a similar behaviour when I define the task using class and run() method (would require a lot of changes in the code base, so not ideal too), and I’ve also came across a plugin for celery…. So my question is: is there anything else I can do with present version of prefect to set up sphinx documentation using
    automodule
    ? If so, could you point me to the right direction/provide me with a code snippet? If not do you plan on making something like prefect-sphinx plugin, that would allow this functionality? Thanks in advance for any comment/suggestion!
    k
    • 2
    • 2
  • j

    Jeremy Phelps

    09/22/2021, 5:21 PM
    Hi all, How do you authenticate to the Prefect GraphQL API using only Python's
    requests
    library? I need to cancel a bunch of flow runs but the Web interface is too cumbersome and the CLI tool doesn't support cancellation. I notice that the browser has a token that only lasts a few minutes.
    k
    • 2
    • 14
  • a

    Anatoly Alekseev

    09/23/2021, 12:51 AM
    Hi all, not a big deal, but my flows receive a label "Fields" out of nowhere. Is this expected? I don't have that label anywhere at the creation step....
    k
    • 2
    • 1
  • a

    Anh Nguyen

    09/23/2021, 2:40 AM
    Could you tell me how to invoke the class that is used between tasks? Thanks ya
    k
    • 2
    • 1
  • s

    Sandip Viradiya

    09/23/2021, 6:48 AM
    Hello Everyone, I am really sorry I am asking this question in the wrong channel. I am trying to set up Prefect and using Bitbucket as storage.
    STORAGE = Bitbucket(
            project="PrefectDemoECS",                            # name of project
            repo="prefect",                          # name of repo in project
            path="demo.py",                      # location of flow file in repo
            workspace="sandipviradiya",	
            cloud_username_secret="bb_username",
            cloud_app_password_secret="bb_app_password"
        )
    It is showing me the 404 error as the given image. I checked the code https://github.com/PrefectHQ/prefect/blob/a2025983352d7e152da88358c3e578f3ce73778a/src/prefect/storage/bitbucket.py#L29 and it seems it is not selecting the Bitbucket cloud function and trying the Bitbucket server fetch only. Any guidance will be really helpful.
    k
    • 2
    • 10
  • s

    Sandip Viradiya

    09/23/2021, 6:52 AM
  • s

    Sandip Viradiya

    09/23/2021, 6:52 AM
    Thanks in advance everyone
  • m

    Milly gupta

    09/23/2021, 8:24 AM
    Hi All, what is the best way to send email notification when flow is completed. Is it state handler?
    a
    k
    • 3
    • 19
  • j

    John Lee

    09/23/2021, 10:12 AM
    Hi all, I wonder can someone help me make sense of some errors I am encountering with gcp auth. I am trying to store gcp credentials on the agent as described here so that my tasks are able to use google storage/big query. I am setting PREFECT__CONTEXT__SECRETS__GCP_CREDENTIALS on the agent via a helm chart to a string containing the json credentials. This seems to propagate GOOGLE_APPLICATION_CREDENTIALS to each prefect job, and the creds are different to what I set on the agent, but this var is set to the json contents rather than a path in the container containing the credentials. This causes errors for the prefect google utilities and the google api in python. I can hack a fix for this by running something like the following but I am wondering if this is expected behaviour or I am setting up the agent incorrectly?
    from pathlib import Path
    import tempfile
    import os
    import google.auth
    
    creds = Path(tempfile.NamedTemporaryFile().name)
    creds.write_text(os.environ["GOOGLE_APPLICATION_CREDENTIALS"])
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = str(creds)
    google.auth.default()
    k
    v
    • 3
    • 36
  • r

    Ruslan Aliev

    09/23/2021, 12:36 PM
    Hello Everyone! I use
    local agent
    and try to bind local folder with folder inside docker container using
    volumes
    , but looks like it doesn’t work or I do it wrong 👀:
    container_name = StringFormatter(name="Container name", template="scenario_{scenario_id}_task_{task_id}")
    container_id = create_container(image_name=image_name,
                                    container_name=container_name(scenario_id=scenario_id, task_id=task_id),
                                    # command=command,
                                    command=['ls'],
                                    volumes=['data:/home/data',
                                             'temp:/home/temp'],
                                    ).set_upstream(input_file_path)
    started = start_container(container_id=container_id)
    status_code = wait_on_container(container_id=container_id, upstream_tasks=[started])
    logs = get_container_logs(container_id=container_id, upstream_tasks=[status_code])
    log([logs])
    The
    log([logs])
    shows, that the
    data
    and
    temp
    folders are missed. What am I missing here?
    k
    • 2
    • 9
  • b

    Barbara Abi Khoriati

    09/23/2021, 12:38 PM
    Hey everyone, I'm new to Prefect (and AWS) and have a quick question. When deploying your ECS Agent inside a EC2 in same AWS environment, do I still need to set authentication for the container deployments, and the S3 bucket with my scripts? Thanks!
    k
    • 2
    • 3
Powered by Linen
Title
b

Barbara Abi Khoriati

09/23/2021, 12:38 PM
Hey everyone, I'm new to Prefect (and AWS) and have a quick question. When deploying your ECS Agent inside a EC2 in same AWS environment, do I still need to set authentication for the container deployments, and the S3 bucket with my scripts? Thanks!
k

Kevin Kho

09/23/2021, 2:00 PM
Hey @Barbara Abi Khoriati, I think you can give the EC2 instance a role with permissions and it should be able to make the requests. That role is specified here so it can be passed to the agent or through the RunConfig as seen in that docs
b

Barbara Abi Khoriati

09/23/2021, 4:41 PM
@Kevin Kho Thanks for the reply! So, by setting that, It would not be necessary to set environment variables or a .aws/config.toml?
k

Kevin Kho

09/23/2021, 4:42 PM
I believe no
👍 1
View count: 1