https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Adam Roderick

    12/30/2020, 6:09 PM
    Is it safe to store secrets in prefect's context? Or will they get persisted or logged anywhere?
    j
    3 replies · 2 participants
  • m

    Marwan Sarieddine

    12/30/2020, 6:21 PM
    Hi folks, we have slack notifications implemented as state handlers on task runs, and this has worked well to notify us about flow run failures. But there are times when the flow fails before reaching a submitted/running state (i.e. due to no heartbeat detected, or fails due to lazarus failing to rescheduled the run) - what is the best practice to enable slack notifications for these cases ?
    j
    3 replies · 2 participants
  • a

    Adam Roderick

    12/30/2020, 6:37 PM
    I have a class that inherits from Flow. When I run the flow locally calling flow.run(), I can see log messages from the overridden run() method (which then calls the parent class's run() method). However, when running in Fargate, I do not see the log messages. Does flow.run() not get called?
    j
    2 replies · 2 participants
  • a

    Alfie

    12/31/2020, 1:35 AM
    Hey team, I’d like to know if cron Schedule of a flow can be aware of the timezone? The host uses UTC timezone, but I want to have the cron schedule executes based on a specific timezone, for example every 2pm of a local time. Thanks.
    c
    2 replies · 2 participants
  • s

    Scott Moreland

    12/31/2020, 5:08 PM
    Any advice for "always running" tasks? I'm reading ticker data from a websocket connection (running on its own thread) and performing actions on the websocket feed using a flow that's run on a schedule. I'm just curious if others have advice for this type of situation. It would be nice to have the websocket running inside a task that's persistent and running on its own dask worker. The pseudo code is shown below:
    ws = instantiate_websocket_feed() # persistent connection run on own thread
    
    # this flow gets run on a schedule every few seconds
    with Flow("execute-trades", schedule) as flow:
        latest_prices = get_prices(ws)
        execute_trades(latest_prices)
    Is there a better way to structure this?
    d
    4 replies · 2 participants
  • a

    Adam Roderick

    12/31/2020, 6:53 PM
    The Dockerfile method of building docker storage is leaving tmp directories like
    tmp3v36vta4
    after registration with cloud. This is cluttering up our repository. Is there a setting to avoid this?
    m
    6 replies · 2 participants
  • b

    Brett Naul

    12/31/2020, 7:42 PM
    quick q, we have a bunch of flow runs from several months ago stuck in "Running"; the state messages look like
    [11 September 2020 5:46pm]: Flow run is cancelling
    anyone know of a way to clear all of these out so they don't count against our concurrency limit?
    j
    1 reply · 2 participants
  • b

    bral

    01/02/2021, 7:54 AM
    hi folks. I am using a dask executor to perform tasks on multiple machines via map (). but it happens that the processing of one task takes a very long time and because I work with a pandas and I cannot parallelize the calculations already inside map (). I would like to use modin for this. has anyone tried something like this?
    n
    2 replies · 2 participants
  • e

    Equipe AI HOC

    01/02/2021, 11:28 AM
    Hello. I am in doubt either to use Trigger or State Handler. I want some tasks to check a context/parameter to see if they should run or not.
    n
    2 replies · 2 participants
  • a

    Adam Roderick

    01/02/2021, 1:49 PM
    I see that
    flow.register()
    initializes a Client object with no arguments https://github.com/PrefectHQ/prefect/blob/4d8337f75fe9bbb3024faa3b74a8f7debbb596d0/src/prefect/core/flow.py#L1663-L1665. From what I can tell, this will use the api_token value from config.cloud.auth_token (https://docs.prefect.io/api/latest/client/client.html). What should we do in a build server scenario? Will
    Client()
    read from an environment variable if one is set?
    n
    4 replies · 2 participants
  • m

    Marwan Sarieddine

    01/02/2021, 5:15 PM
    Hi folks, question about
    manual_only
    triggers for a mapped task, is there a way to approve all child tasks at once, or do I have to click through them manually - this gets unwieldy quite quick
    n
    6 replies · 2 participants
  • a

    Adam Roderick

    01/02/2021, 5:33 PM
    Where should I look to learn how to upgrade from environments to run configs?
    m
    2 replies · 2 participants
  • a

    Adam Roderick

    01/02/2021, 7:33 PM
    In AWS execution environment, where are the flow run's container logs stored?
    m
    5 replies · 2 participants
  • a

    Adam Roderick

    01/02/2021, 11:24 PM
    I'm trying to use ECSRun, but getting an error in the flow run log: "An error occurred (ClusterNotFoundException) when calling the RunTask operation: Cluster not found." Any ideas what do do with this?
  • w

    wiretrack

    01/03/2021, 12:05 AM
    Hey all, happy new year everyone. Hope everyone stays safe and healthy! I’m currently experimenting with an ETL architecture, and I read in the docs that Dask cluster was the preferred way of running a flow. But I couldn’t find examples of using the Executer within the functional API, only with the imperative API (
    flow.run(executor=Dask
    ). I did however started a Dask Cluster locally, and set the environment variables, as per the docs, but my tasks are still syncronous, and not running in parallel, and I can’t see anything in the dask workers logs. is there anything I should be doing to enjoy the speed and benefits of the parallel and distributed computation?
    m
    3 replies · 2 participants
  • a

    Adam Roderick

    01/03/2021, 1:29 AM
    Still trying to work out the ECSRun config. The task definitions that get created do not seem to support fargate. So if the agent's launch type is FARGATE, it attempts to launch a fargate task, but errors. Thinking I could get around this by supplying my own task definition, the agent is now reporting the error
    Error while deploying flow: AttributeError("'str' object has no attribute 'get'")
    2 replies · 1 participant
  • e

    Equipe AI HOC

    01/03/2021, 5:16 PM
    Hello. Which executor does the local agent uses? I've set LocalDaskExecutor for the flow right before registering (cloud backend), but when a execute via the UI it does not seem to be parallelizing.
    w
    1 reply · 2 participants
  • v

    Vitaly Shulgin

    01/03/2021, 5:33 PM
    Hello, I tried to use custom adjustment in schedule, and got error is there any way to support custom adjustment? I need to randomize Error is:
    custom functions aren't supported for `Schedule.adjustments`.
  • w

    wiretrack

    01/04/2021, 12:46 AM
    Hey all. Anybody else having troubles with the
    server
    ? I’ve been getting a long traceback while running the server locally. It seems that a
    flow_run
    is stuck cached somewhere, and some process is still firing a log. I think this happened after I cancelled and deleted a few flows and flow runs. I already uninstalled the server and installed it again, but no success
    d
    8 replies · 2 participants
  • s

    Sagun Garg

    01/04/2021, 3:55 AM
    Requesting Guidance for a Code Sample 🙏 For events based trigger, I believe we have to use the GraphQL API and Prefect Client. Usecase: Based on certain event, I wish to spawn parametrised flows on my prefect cloud in adhoc fashion. The code written for an event to trigger this flow is outside the dockerized repo of the flows. [FlowId Vs FlowName] in mutation GraphQL Prefect client The Mutation query listed in the examples demands a flow id. Is there a better supported syntax for triggering a flow with it's flow name. I want through the following example https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a
    w
    7 replies · 2 participants
  • y

    Yannick

    01/04/2021, 10:08 AM
    Hi everyone, I am evaluating whether I can build on top of Prefect (as a backend for Orchest) and am left with a few questions about serialization, caching and data passing between steps in a distributed setting. Any help would be greatly appreciated :) • I understand that serialization is done using
    cloudpickle
    so that Dask workers can pass the data. Does that mean that data passing between the tasks is in the hands of Dask completely? Meaning that the data is passed over the network in case Dask schedules the tasks to not be on the same machine in the cluster? • About large data from the docs: "Don't worry about passing large data objects between tasks. As long as it fits in memory, Prefect can handle it with no special settings." What exactly should fit in memory here, the sum of all output data in the flow or is there some sort of eviction going on? Example: when building a flow like: A --> B and A --> C, and B --> D, should the output from A + output from B fit completely in memory? Secondly, from the docs: "(...) If it doesn't, there are ways to distribute the operation across a cluster.", how would I go about doing such a thing? • For Input Caching, is there any way to configure how this works as it states: "Input caching is an automatic caching." since I would like additional control over input caching. Many, many thanks! 🙏
    k
    4 replies · 2 participants
  • a

    as

    01/04/2021, 1:02 PM
    Hey, I don't get how validators work for results. for example, in the following task
    task(get_data,
            result=LocalResult(serializer=JSONSerializer(), validators=lambda : False),
            target=join(p.data_path, "data.json"),
            checkpoint=True,
        )
    If a target file already exist here, is the task run anyway because the validator returns False? What I want to achieve is to use a kind of "force_run" variable in a result validator (eg. defined in a user config) that forces to evaluate a task, even if there is already a result at the target location.
    e
    k
    6 replies · 3 participants
  • e

    Equipe AI HOC

    01/04/2021, 1:06 PM
    Hello! Question about caching: I've seen in the docs about
    cache_for
    ,
    cache_validator
    ,
    cache_key
    . For what I understood, this will not cache between flow runs (different process) unless you configure
    target
    ,
    result
    and
    checkpoint
    . Is that correct? When I configure these last 3 "persistance" options, it caches between flows, but they don't seem to work integrated with
    cache_validator
    (and any of the first 3 "cache" options for that matter). Is that correct? Are these independent mechanisms? What I want to achieve, put simply, is to cache between flow runs/processes and have a function (cache_validator) to check if the cache/file is still valid.
    k
    4 replies · 2 participants
  • a

    Albert Franzi

    01/04/2021, 2:23 PM
    Hi there! Albert Franzi from Typeform, we are evaluating swapping from Airflow to Prefect, and I was wondering which mechanism the community use to
    register
    Flows with CI/CD tools? Do you have a side docker container which pulls master branch and register new flows using the
    idempotency_key=flow.serialized_hash(),
    approach? (similar as Airflow)? Or any advice / better way of doing it?
    k
    2 replies · 2 participants
  • m

    Matthew Blau

    01/04/2021, 2:24 PM
    Hello all, I am getting this message on my prefect server set up. I had changed the config.toml to point to the correct IP address modeled after this https://github.com/PrefectHQ/prefect/blob/master/src/prefect/config.toml example. what am I doing wrong?
    k
    a
    3 replies · 3 participants
  • w

    wiretrack

    01/04/2021, 2:37 PM
    Hey all, have a doubt regarding Dask. In an ETL that leverages Pandas quite a lot, should I stick with the Dask Executor alone, or should I use both the executor and the Dask Dataframe? I’m having a bit of trouble understanding when to just use the Scheduler / Executor, and when to use both the Executor and the specific dataframes APIs
    k
    9 replies · 2 participants
  • c

    Charles Lariviere

    01/04/2021, 6:16 PM
    Hey all 👋 Has everyone here written or found a guide on how to actually deploy Prefect on AWS in a production setting? It’s great and all to be able to run a flow and deploy agents locally (or on ECS by running
    prefect agent ecs start
    ); but for Prefect to be able to run flows on a schedule, it requires an agent to be always running (since Prefect Cloud doesn’t spin up agents). How does one actually do this without spending days experimenting?
    🙌 1
    👀 1
    k
    s
    7 replies · 3 participants
  • a

    Arash Roshani

    01/04/2021, 6:37 PM
    Hello all, I'm doing an ETL with a giant flow that is made out of several subflows. I'm trying to break my main flow and keep the subflows in different modules. Is there an easy way to do this? Should I inherit the Flow object in every subflow and manually define the dependencies of tasks in each subflow or is there a way to do it simple similar to how it can be done by the
    with
    statement definition of a flow? Does anyone have an example?
    k
    2 replies · 2 participants
  • m

    Marc Lipoff

    01/04/2021, 9:18 PM
    Hi all. I have a situation where my flow used mapped tasks. At times, though, there are no logs for some children tasks. Here is an example. The other interesrting things is that I can't reproduce it on my local machine -- it only seems to happen on prefect cloud
    k
    5 replies · 2 participants
  • j

    jack

    01/04/2021, 10:24 PM
    Hey all! I am still having issues running any newly registered flows from Prefect Cloud (all prefect versions - even with simple 'hello world' prefect flows that run on my local agent just fine) for the past weeks. I have checked if my kubernetes agents were still alive by running previously registered flows and they all worked fine. I am getting either of these two error messages:
    {'type': ['Unsupported value: UniversalRun']}
    {'_schema': 'Invalid data type: None'}
    Not really able to run any newly built flows, so massive impact to my work. Did anyone have this issue and able to resolve it?
    k
    8 replies · 2 participants
Powered by Linen
Title
j

jack

01/04/2021, 10:24 PM
Hey all! I am still having issues running any newly registered flows from Prefect Cloud (all prefect versions - even with simple 'hello world' prefect flows that run on my local agent just fine) for the past weeks. I have checked if my kubernetes agents were still alive by running previously registered flows and they all worked fine. I am getting either of these two error messages:
{'type': ['Unsupported value: UniversalRun']}
{'_schema': 'Invalid data type: None'}
Not really able to run any newly built flows, so massive impact to my work. Did anyone have this issue and able to resolve it?
k

Kyle Moon-Wright

01/04/2021, 10:52 PM
Hey @jack, You’ll likely need to reinstall your Agents with the latest version of Prefect if your newly registered flows are using a later version. Agents will be able to pick up flows of lesser or equal versions of Prefect. If that doesn’t work though, let us know!
j

jack

01/05/2021, 1:26 AM
Hey @Kyle Moon-Wright, thanks for that! I reinstalled my agents with the latest prefect version and my old flows still work (thankfully). Then i registered my new flows and they gave me this error. Could you help me understand the issue here?
14:22:37
ERROR
execute flow-run
Failed to load and execute Flow's environment: ValueError('unsupported pickle protocol: 5')

14:22:47
ERROR
k8s-infra
Pod prefect-job-c30cdca2-x9qzh failed.
	Container 'flow' state: terminated
		Exit Code:: 1
		Reason: Error
k

Kyle Moon-Wright

01/05/2021, 1:37 AM
Hmm, that one is tougher but is likely due to a Python mismatch of some sort - protocol 5 was introduced as the default in Python 3.8. This user encountered something similar as seen in this thread.
j

jack

01/05/2021, 3:53 AM
@Kyle Moon-Wright Now, the flow seems to run when I register the flow with prefect version up to
0.13.19
. The moment I register flow with Prefect version
0.14.0/0.14.1
, it throws me that
unsupported pickle protocol: 5
error. I am not quite sure if I understood that thread properly but the python version I run on my local machine is `3.8.6`How and where/what am I checking for the remote python version?
The docker image is push with the latest image
docker pull prefecthq/prefect:latest-python3.8
and it's a kubernetes agent
k

Kyle Moon-Wright

01/05/2021, 5:03 PM
Hmm, tough to say where the discrepancy is but in general I would recommend pulling latest with
docker pull prefecthq/prefect:latest
as long as you can ensure your newly registered flows and your Agent are running the most recent versions (0.14.1).
j

jack

01/06/2021, 2:37 AM
I have done so and re-pushed the image to the ECR repository. Then I reinstalled the EKS cluster agent using the 0.14.1 prefect version (which shows the prefect version on my agent tab in Prefect UI). Still not really able to run any flows at 0.14.0+ for the same unsupported pickle protocol issue. FYI - I used this medium blog to setup our agent https://medium.com/the-prefect-blog/seamless-move-from-local-to-aws-kubernetes-cluster-with-prefect-f263a4573c56
View count: 3