https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Matthew Blau

    04/30/2021, 7:22 PM
    Hello all, I have been reading about github storage and had a few questions regarding it's use: currently we use docker images that we build with docker-compose and have a minimal task to create, start, and gather the logs. This has the disadvantage of not having logs in the Prefect UI and I was investigating github storage as a way to allow us to have logs in the UI. My question is, with using the github storage, what is the minimum that I need in the flow.py file for it to pull down the actual flow from github? And does it automatically create tasks based off of the tasks contained in the flow stored in github? I am trying to wrap my head around how it works and appreciate any advice and guidance with this
    k
    • 2
    • 9
  • e

    Enda Peng

    04/30/2021, 7:45 PM
    Maybe this is more than a prefect question. I use prefect to run batch job every day. Usually I have a lot of files like this: file_1, file_2, … file_100. Each processing task is independent to each other.  In order to parallelize the computation, I have several attempts: • Option 1: Single flow + Single agent + Dask + Dask workers: One flow processes multiple files. 1-on-1 mapping between task and file.
    @task
    process_file(x):
    
    with Flow(xxx):
      [process_file(x) for x in file_names]
    It works fine with LocalRun ◦ pro: Easy to set up, dask has the friendly api to control resource ◦ con: I have to set up the same running env for every worker added in this dask cluster.  • Option 2: Multiple flows + multiple agents: 1-on-1 mapping between flow and file e.g I could create 10 docker agents running on 10 hosts. Then in a script I create and run 100 flows, each flow processes one file. Let prefect distribute the flow for me ◦ pro: computation module is shipped with docker image, it is set up free. ◦ con: Not sure whether prefect is supposed to do the work load distribution duty. Even if yes, it is hard to control the resource consumption • Option 3: Single flow + K8s:   Build image for my computation module and register with K8S first. Within one flow, it create k8s tasks which request for creating 100 pods to process the file.  ◦ pro:  ▪︎ k8s could deal with the workload distribution. Adding nodes could be easy.  ▪︎ Any agent would be file as long as it could talk to k8s api ◦ con: complexity of setting up k8s? Appreciate for any thoughts and input here!
    k
    j
    • 3
    • 5
  • c

    Carter Kwon

    04/30/2021, 8:08 PM
    Hello, I'm trying to better understand the pricing/usage model. We have an ETL job that needs to make ~10k API calls to 2 different endpoints so ~20k calls total. We use Prefect's map and LocalDaskExecutor to utilize parallelization. However, if my understanding is correct, a single flow run counts as ~20k task runs and cost either
    20,000 * 0.0025 = $50
    or
    20,000 * 0.005 = $100
    depending on the plan you're on?
    j
    • 2
    • 7
  • b

    Belal Aboabdo

    04/30/2021, 9:59 PM
    Hi is there a way to build a flow in a docker container without registering it? I can't seem to find it in the documentation but I'm trying to do something like this in my deploy script.
    prefect build -p my_flow.py
    which throws this usage error
    Usage: prefect [OPTIONS] COMMAND [ARGS]...
    Try 'prefect -h' for help.
    
    Error: No such command 'build'.
    
    Exited with code exit status 2
    k
    • 2
    • 4
  • s

    Sean Perry

    04/30/2021, 10:33 PM
    What have people come up with as a best practice for having tasks indicate that they failed? I could not find any example of a task that could fail in the docs or what failure would look like. Returning “none” is not a failure. Are only uncaught exceptions failures?
    👀 1
    c
    a
    • 3
    • 3
  • j

    Jeff Payne

    05/01/2021, 5:40 AM
    💃 Just arrived!
    👋 3
    k
    • 2
    • 1
  • a

    Adam Roderick

    05/01/2021, 2:53 PM
    We re-deployed our ECS agents with the PREFECT__CLOUD__AUTH_TOKEN env var instead of (PREFECT__CLOUD__AGENT__AUTH_TOKEN), set to the new API key. The agents start up and show green on the agent status page in cloud. But the scheduled flows never start (eventually get resurrected but error out on the third try). What am I doing wrong?
    m
    • 2
    • 3
  • t

    Trevor Kramer

    05/01/2021, 6:47 PM
    from prefect import Flow, task
    
    @task
    def add_ten(x, y):
        return x + y
    
    @task()
    def log_result(x):
        print(x)
    
    
    with Flow('simple map') as flow:
        mapped_result = add_ten.map([1, 2, 3], [10, 11, 12])
        log_result(mapped_result)
    if __name__ == '__main__':
        from prefect.run_configs import LocalRun
        flow.run_config = LocalRun()
        flow.run()
    I was expecting this code to return 9 results instead of the 3 actually returned. Is there a way to have map do the pairwise enumeration? I was assuming because neither argument was marked as unmapped then they would both be looped over.
    c
    • 2
    • 4
  • r

    Robert Bastian

    05/01/2021, 11:23 PM
    Hello. I’m having some issues with mapped task parallel execution. I’m testing locally using the LocalDaskExecutor(). Can you confirm that separate mapped tasks without a dependency will execute serially? In the example below the mapped tasks within ‘a’ and b’ execute in parallel, but ‘a’ and ‘b’ execute serially.
    with Flow("testing") as flow:
        a = poll.map(poll_interval=[5,10])
        b = poll.map(poll_interval=[4,9])
    
    flow.run(executor=LocalDaskExecutor())
    [2021-05-01 18:20:08-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'testing'
    [2021-05-01 18:20:08-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState': Starting task run...
    [2021-05-01 18:20:08-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState': Finished task run for task with final state: 'Mapped'
    [2021-05-01 18:20:08-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState': Starting task run...
    [2021-05-01 18:20:08-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState': Finished task run for task with final state: 'Mapped'
    [2021-05-01 18:20:09-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[0]': Starting task run...
    [2021-05-01 18:20:09-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[1]': Starting task run...
    [2021-05-01 18:20:14-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[0]': Finished task run for task with final state: 'Success'
    [2021-05-01 18:20:19-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[1]': Finished task run for task with final state: 'Success'
    [2021-05-01 18:20:19-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[0]': Starting task run...
    [2021-05-01 18:20:19-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[1]': Starting task run...
    [2021-05-01 18:20:23-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[0]': Finished task run for task with final state: 'Success'
    [2021-05-01 18:20:28-0500] INFO - prefect.TaskRunner | Task 'PollDSCCaptureState[1]': Finished task run for task with final state: 'Success'
    [2021-05-01 18:20:28-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    Thx!
    r
    k
    • 3
    • 7
  • j

    Jason Prado

    05/02/2021, 2:17 AM
    I’m having some trouble understanding whether I should be able to run
    python myflow.py
    and read Secrets within my flow. I’ve added the Secrets in the Cloud UI and authenticated with the prefect CLI. Is the right model that “running a flow locally without an agent never hits the server” or am I mistaken?
    j
    • 2
    • 4
  • n

    Newskooler

    05/02/2021, 11:55 AM
    Hey Prefect community! 👋 Is there a way to control the order of mapped executions (I use local dask executor)? I noticed that once mapped (and scheduled) the actual execution is quite random… Whereas in my case, I need to be consecutive in the order I have provided it). Any help would be much appreciated! 🙂
    e
    k
    • 3
    • 5
  • e

    Enda Peng

    05/02/2021, 4:15 PM
    Got an issue when I use shelltask. I have a long lasting task which creates tons of log. If I enable stream_out, it overwhelms my disk. If I don’t, prefect attempts to kill the task after several hours due to
    heart beat check
    .
    k
    • 2
    • 1
  • r

    Rehan Razzaque Rajput

    05/03/2021, 7:33 AM
    Hi everyone, I'm new to prefect and I had a question for which I couldn't find an answer online: Can we run multiple separate flows at the same time, in parallel? I understand that we can parallelize tasks within a Flow. But I'm asking about running Flows in parallel. Thanks
    k
    • 2
    • 1
  • y

    Yohann

    05/03/2021, 7:40 AM
    Hi community 🙂 I'm new and I need some help for a very simple problem. I'm testing prefect since a few days, and I try to pass environment variables to a flow. It seems easy but it doesn't work for me. I have configured the flow like this flow.run_config = LocalRun(env={"GREETING": "Hello"}). And when I call flow.run, os.environ doesn't contain the GREETING env. Do you know why ? It is for local test and I don't want to register the flow right now. Thank you!
    k
    • 2
    • 6
  • p

    Peter Roelants

    05/03/2021, 8:24 AM
    Hi Prefect, Is there a way to create a custom Task that can be visualized in the Prefect UI similar to Parameter tasks? I currently get most of my parameter variables from environment variables being set (and want to continue doing this). Ideally I want to group related parameters into groups (e.g. by using data classes) so the resulting groups can be passed around to wherever they are needed. For example, I currently fetch my parameters to communicate with my Kafka service via this custom task:
    class GetKafkaConfig(prefect.Task):
        """
        Get Kafka configuration
        """
        def run(
                    self,
                    timeout: timedelta
                ) -> KafkaConfig:
            return KafkaConfig(
                broker_address=os.environ['KAFKA_BROKER_ADDRESS'],
                topic_name=os.environ['KAFKA_TOPIC_NAME'],
                timeout=timeout
            )
    I would like to visualize these parameters in my prefect UI (and ideally make them editable), similar to how Parameter tasks can be visualized (e.g. as is demonstrated in

    https://docs.prefect.io/orchestration/tutorial/hello-flow-run-parameter-config.png▾

    ) Is it possible to write custom visualization layers for custom Tasks?
    k
    • 2
    • 4
  • m

    merlin

    05/03/2021, 9:22 AM
    message has been deleted
    k
    • 2
    • 1
  • d

    Domantas

    05/03/2021, 10:15 AM
    Hello guys, I have a question related to the task RAM usage - it is possible to save results using S3 storage or other storage without keeping all results in RAM? This is needed because I would like to process a large file, that is split into x subfiles, load each subfile in a different task, perform necessary operations, save it in the pickle file and then load that pickle file in the other task. The goal is to keep organised tasks in the prefect flow(I would like to track each subfile task operation in the prefect tasks) and keep as minimal RAM usage as possible by not storing all data in the RAM(need to store just 1 subfile at the time in the RAM). For now I'm trying to use S3 storage results(https://docs.prefect.io/orchestration/execution/storage_options.html#aws-s3), but it seems it is not free up RAM memory when result is saved into the pickle file. Any ideas related with this problem?
    a
    k
    r
    • 4
    • 9
  • g

    g.suijker

    05/03/2021, 12:04 PM
    Hi all, I'm using Docker storage to store my flow within a Docker image on azure container registry. However when I change my flow and push the image to the registry (with the same tag as the previous version) it appears that when I run the flow with the Cloud UI, the flow is not run with the latest image containing the changes. Any ideas on why the agent is not using the latest image?
    m
    • 2
    • 3
  • r

    Ranu Goldan

    05/03/2021, 12:52 PM
    Hi everyone, When a task is failed, what happens to the downstream task? I was expecting that the downstream tasks would be failed. But it was just staying on the pending state. Is that expected from Prefect Flow?
    a
    k
    • 3
    • 20
  • g

    Gage Toschlog

    05/03/2021, 3:40 PM
    Is it possible to name a flow run with the flow.run() function? We are currently using the client.create_flow_run() function but we need this call to be synchronous as to not overload our data warehouse. Appreciate any advice!
    m
    k
    • 3
    • 4
  • b

    Belal Aboabdo

    05/03/2021, 5:59 PM
    Hi everyone I'm having trouble registering my flow to cloud and am getting this docker error.
    docker.errors.NotFound: 404 Client Error for <http+docker://localhost/v1.41/images/create?tag=0.14.0-python3.9&fromImage=prefecthq%2Fprefect>: Not Found ("manifest for prefecthq/prefect:0.14.0-python3.9 not found: manifest unknown: manifest unknown")
    k
    • 2
    • 3
  • n

    Nathan Atkins

    05/03/2021, 6:04 PM
    I have a mapped task and wanted to set the task name dynamically with
    @task(task_run_name=name_fn)
    where name_fn() dynamically generates the task name form the kwargs that are passed to it. This all works great when I'm running with the UI. When I run directly by calling flow.run() the
    set_task_run_name()
    in engine/task_runner.py is stubbed out and doesn't call my name_fn(). I can see that in TaskRunner.run() getting the call to
    set_task_run_name()
    isn't totally straight forward. What would it take to get
    set_task_run_name()
    to work when running directly without the UI?
    k
    • 2
    • 16
  • s

    Sean Perry

    05/03/2021, 6:09 PM
    https://docs.prefect.io/core/task_library/control_flow.html is raising a 404, linked from here: https://docs.prefect.io/core/examples/conditional.html#conditional-tasks
    k
    • 2
    • 5
  • j

    Joseph Loss

    05/03/2021, 7:14 PM
    happy monday everyone!
    👋 3
    ☕ 2
  • j

    Joseph Loss

    05/03/2021, 7:17 PM
    Does anyone know what is going on here? In every task, I use logger = prefect.context.get('logger') and then call logger.info(), logger.debug(), etc.
    D:\venv\poetry\.venv\lib\site-packages\prefect\utilities\logging.py:123: 
    UserWarning: Failed to write logs with error: 
    ClientError('400 Client Error: Bad Request for url: <https://api.prefect.io/graphql>\n\n
    
    The following error messages were provided by the GraphQL server:\n\n    
    INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at\n        
    "input.logs[0].flow_run_id"; Expected non-nullable type UUID! not to be null.\n    
    
    INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at\n        "input.logs[2].flow_run_id"; Expected non-nullable type UUID! not to be null.\n    
    
    INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at\n        "input.logs[4].flow_run_id"; Expected non-nullable type UUID! not to be null.\n
    c
    k
    • 3
    • 8
  • b

    Braun Reyes

    05/03/2021, 7:22 PM
    are there any plans to add webhook as an action for automations?
    m
    j
    m
    • 4
    • 10
  • e

    Enda Peng

    05/03/2021, 8:15 PM
    I have a project with this structure. The two flows have some shared functions and configs.  project/ • flow1.py • flow2.py • utils/ • config/ I’d like to pack two flows in one docker image but I don’t see a prefect command which can help me do so. e.g if I speficy storage for both flow1 and flow2 with the same image name, they will overwrite each other when I call
    prefect register
    So I try to replicate the behavior of building storage by writing my own dockerfile. A question here is how does the file
    healthcheck.py
    and
    flow1.flow
    come into scope? Below is the output after I call register command with docker storage
    Step 7/12 : COPY flow1.flow /opt/prefect/flows/flow1.prefect
     ---> ea7572a6c0e7
    Step 8/12 : COPY healthcheck.py /opt/prefect/healthcheck.py
    k
    • 2
    • 7
  • t

    Trevor Kramer

    05/04/2021, 1:27 AM
    Is there any way to see in the cloud ui what executor the flow is registered with? I'm not seeing the task concurrency I am expecting and want to confirm the executor settings.
    k
    • 2
    • 9
  • j

    jaehoon

    05/04/2021, 2:30 AM
    Does anyone know what is going on here? when getting api responses from google-ads, I keep getting this error
    Unexpected error: ReferenceError('weakly-referenced object no longer exists')
    on prefect Run logs api res count is almost 2000, error no occurs in local env someone please help me..!
    k
    • 2
    • 9
  • s

    Stéphan Taljaard

    05/04/2021, 8:18 AM
    Hi. I was looking for more information on tenants on the docs site, but only found brief mentions here and there. I found someone with a similar question. Are there any plans to expand on the docs? What exactly is the role of a tenant, do I still need to run
    prefect server create-tenant
    when using prefect server?
    m
    • 2
    • 3
Powered by Linen
Title
s

Stéphan Taljaard

05/04/2021, 8:18 AM
Hi. I was looking for more information on tenants on the docs site, but only found brief mentions here and there. I found someone with a similar question. Are there any plans to expand on the docs? What exactly is the role of a tenant, do I still need to run
prefect server create-tenant
when using prefect server?
m

Mariia Kerimova

05/04/2021, 1:13 PM
Hello Stephan! Yes, improving docs is on our roadmap, and will be resolved sometime soon. At this moment, you need to create a tenant (team) for Prefect Server, which is similar to Prefect Cloud, but server doesn’t include auth benefits such as inviting users to specific tenant, etc.
🙏 1
s

Stéphan Taljaard

05/04/2021, 1:22 PM
I guess the reason why it is called tenant and not team is non-trivial?
m

Mariia Kerimova

05/04/2021, 1:26 PM
I’m not sure why it’s called tenant 🤔, but I totally understand the confusion.
View count: 2