https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Sean Talia

    12/02/2020, 5:24 PM
    has anyone else ever seen this behavior whereby they register a flow (using
    DockerRun
    run config,
    Docker
    for storage) with labels, and the labels show up in the prefect UI, but not in the terminal output? e.g. I execute:
    with Flow(
        "docker-flow", run_config=DockerRun(labels=["data-science"]), storage=Docker()
    ) as flow:
    and then register the flow. The flow shows up in the UI with the
    data-science
    label, but the terminal output when registering the flow looks like:
    Flow URL: <flow_url>
     └── ID: <id>
     └── Project: test-project
     └── Labels: []
    this was causing me to believe that the labels weren't getting attached to the flow, but they are in fact there
    j
    2 replies · 2 participants
  • a

    Anish Chhaparwal

    12/02/2020, 5:27 PM
    hey i have a couple of question: is it possible to use cache_for in a ShellTask? How can i pass a parameter to command (f string) of a ShellTask. Eg:
    git_clone=ShellTask(log_stderr=True)
    with Flow ('QETL') as flow:
    git_url = Parameter("git_url",
                      
    default="<https://github.com/ieee8023/covid-chestxray-dataset>")
    git_clone(command="git clone {url} {target}".format(url=git_url)
    flow.run()
    m
    2 replies · 2 participants
  • a

    ale

    12/02/2020, 6:24 PM
    Hi folks, is it possible to pass environment variables to containers when starting prefect agent in Docker mode from command line? I see we can pass volumes, but not environment variables. Is this possible?
    j
    m
    4 replies · 3 participants
  • a

    Andrew Hannigan

    12/02/2020, 6:53 PM
    When using a fargate agent - does the entire flow run as one task within fargate? Or does each prefect task correspond to a fargate task?
    j
    1 reply · 2 participants
  • a

    Aiden Price

    12/02/2020, 10:25 PM
    Hi everybody! I'm a bit confused by the new
    run_config
    I can get
    KubernetesRun()
    to work with no problems at all. But I'm unsure which
    run_config
    to use for my pre-existing Dask cluster. I presume I need the
    DaskExecutor
    but do I use a
    LocalRun
    with it like I used to use a
    LocalEnvironment
    ? I'm a fan of this change by the way. Thank you all.
    j
    b
    +1
    10 replies · 4 participants
  • a

    Andrew Hannigan

    12/02/2020, 11:03 PM
    How would I execute a flow from Prefect Cloud/UI on a dask cluster in the absence of a dask agent? Seems as though dask assumes responsibility for what the agent normally does, but without an agent how do we indicate to prefect cloud where to find the dask cluster?
  • m

    Matt Drago

    12/03/2020, 5:03 AM
    Hey Folks, I'm building a flow that will use the MySQLFetch task and I have noticed that I cannot use Secrets to populate the user and password fields. I noticed this thread from a few months ago, but I cannot find an open issue in github. I'd like to work on this issue, is it best for me to raise the issue or is there another way I should go about it?
    d
    2 replies · 2 participants
  • j

    Joël Luijmes

    12/03/2020, 12:31 PM
    Hey question! I’m runing Prefect in Kubernetes, and I began working on a flow which will fire up 10 Kubernetes jobs. However, currently it is spawning these jobs sequentially, while I want to make them start all immediately. In order to do so, I deployed dask to the cluster (with https://helm.dask.org), used the DaskExecutor and point it to the scheduler. However, the same is still happening 😞. Are there other configuration required in order for this to work? I tried running the flow locally with the DaskExeuctor, and then it does its magic in parallel 🙂 (Note: tested it with a different flow.) I’ll post the details in thread.
    ✅ 1
    d
    37 replies · 2 participants
  • d

    Dimitris Stafylarakis

    12/03/2020, 1:22 PM
    hey all! I have an issue with using Docker storage; I am trying to use a pre-built base image, however I get an error (
    requests.exceptions.HTTPError: 404 Client Error: Not Found for url: <http+docker://localhost/v1.40/images/create?tag=latest&fromImage=>...
    ) and docker.errors.ImageNotFound when I try to register the flow. Others have mentioned similar errors in the past, but as far as I saw this was when trying to deploy the flow, not when building the image. Any clues/hints/tips would be appreciated! I'm on OSX using docker-desktop btw, just the default config. Thanks in advance 🙂
    j
    d
    +1
    20 replies · 4 participants
  • p

    Payam Vaezi

    12/03/2020, 3:05 PM
    I'm trying testing prefect server and prefect agents within local kubernetes on my machine. I have all pods up and running and submitting job to kubernetes agent. However, I'm getting
    Kubernetes Error: Back-off pulling image "4988c227fd72"
    error when I want to pull an image I built on my local docker. I'm passing the image ID in the environment of graphql call
    "metadata": {"image": "4988c227fd72"}
    . Any idea why my local kuberenetes agent can't pull a local image?
    d
    8 replies · 2 participants
  • d

    Daniel Rothenberg

    12/03/2020, 3:41 PM
    Hi community - I have pair of tandem flows that are essentially a crude training/prediction workflow; one flow is intended to run once day, train an ML model, and archive its output to Google Cloud Storage, and the other flow is intended to run once an hour, grabbing the latest trained model it finds in GCS as well as the latest input data from the past hour, making a prediction, and shooting that off to a database. Prefect has worked amazing for this so far... it was super easy to integrate an agent into my team's k8s cluster, which has a lot of useful infrastructure including shared NFS mounts between pods and a dedicated
    dask-gateway
    to spin up distributed workflows. Major kudos to the team! I have a question though about scaling out this workflow. In my particular application, I basically need to apply this training/prediction workflow for O(10,000) different "assets". Put another way - my workflows each have a
    Parameter
    called "asset". I see two different ways of scaling my toy with one asset to the much larger set of a few thousand: 1. Use the
    .map()
    capabilities on my tasks and simply "map" a list of "assets" for the flow to run through; or 2. Convert my "asset"
    Parameter
    to be a list of all the assets, and do all the mapping inside each of my tasks in the workflow - by leaning on the fact that each task already has access to the dask cluster I'm running the workflow on I'm not sure which is the "recommended" approach here. The training/modeling flow generates, for each asset, artifacts on the order of 1MB or so that have to get passed around, but I'm worried about the performance of Prefect's scheduler if it needs to map to tasks over a list of several thousand items. On the other hand, things get... complicated (with my naive workflow for saving trained models and whatnot)... if I have to manually do all the book-keeping inside each task in the flow. Thoughts? Does anyone have experience scaling out ML training/prediction workflows on Prefect that can offer some insight?
    d
    p
    6 replies · 3 participants
  • b

    Ben Fogelson

    12/03/2020, 3:56 PM
    Does prefect support task memoization through either results or targets? I.e. I want to cache task results based on their inputs, not based on a timestamp or a specific flow run.
    👀 1
    d
    1 reply · 2 participants
  • c

    Chris Jordan

    12/03/2020, 3:58 PM
    Hey, I've got a question that I haven't been able to find a clean answer to in the docs. I'd like to have a flow evaluate the results of one of its tasks and then conditionally spawn a new version of itslef. Something like:
    @task(name="get next batch of records")
    def get_batch(result=PrefectResult()):
        ...
        return len(records)
    
    with Flow("import_flow") as flow:
        num_of_records = get_batch()
        if num_of_records.read() > 0: # this particular syntax doesn't do it, and I'm asking for how to read this here (if I should be)
            kickoff_task = StartFlowRun(project_name="imports", flow_name="import_flow") # StartFlowRun also doesn't seem to spawn a new task - is this the right way to call this?
    d
    3 replies · 2 participants
  • b

    Brian Mesick

    12/03/2020, 5:05 PM
    Hey folks. We’re running into a weird issue where running a Docker stored flow version 0.13.18 (also tried 0.13.13) against a 0.13.13 Kubernetes agent and getting the error:
    Usage: prefect execute [OPTIONS] COMMAND [ARGS]...
    Try 'prefect execute -h' for help.
    
    Error: No such command 'flow-run'.
    Looking through the history of the Slack it looks like this was a known issue with 0.13.0 agents, but I’m not seeing any other possible explanations. Any hints?
    d
    s
    10 replies · 3 participants
  • j

    Julio Venegas

    12/03/2020, 5:35 PM
    Hi everyone, what’s the better pattern to finish a looping Flow as successful that has been scheduled indefinitely. As of now, the main Task of the Flow raises
    ENDRUN(Finished(message="all rows from file read"))
    when it has finished reading all the lines in a big file (I’m not using the LOOP signal, the Flow is an exercise to properly use
    Result
    objects). Should I create a state handler and trigger a final reference task or is there something like
    ENDRUN
    but at the Flow level? Any other alternatives?
    d
    31 replies · 2 participants
  • r

    Richard Hughes

    12/03/2020, 7:35 PM
    Hi, I saw a flow that had 2 PID's in the logs, and for some reason the flow executed 2x instead of one time from a scheduled execution. This caused an issue in my environment. How can I narrow down how this happed? flow-run a24b31dc-16ea-45ce-9816-1eccb4cbb34a
    d
    12 replies · 2 participants
  • a

    Alex Joseph

    12/03/2020, 8:06 PM
    Hi, I've a mapped task creating n different subtasks. I want to persist+checkpoint the output such that if tasks completes, it doesn't need to be rerun (but the other subtasks are rerun) for the next run. I can implement this by using the
    target
    option, but I don't seem to be seeing any option to add the function arguments to template the output. For example, I'd like to have:
    @task(target="{single_pattern}_{config['a']}.txt", checkpoint=True, result=LocalResult(dir="cache/"), )
    def run_spark_job(single_pattern, config):
       ...
    I can template parameters, date etc, but I don't seem to have way to template the arguments themselves. What's the correct way to handle this situation? Thanks 🙂
    m
    3 replies · 2 participants
  • l

    Lee Mendelowitz

    12/03/2020, 10:33 PM
    Hey Community - are there any examples out there for how to use the AWSClientWaitTask? (https://docs.prefect.io/api/latest/tasks/aws.html#awsclientwait). Specifically I’m trying to wait on an AWS Batch job to finish. It’s not clear to me how to pass the job name or job id into the task. Thanks!
    d
    3 replies · 2 participants
  • j

    JD Margulici

    12/04/2020, 1:14 AM
    Hello, I'm a new user to Prefect, currently building a SaaS application that involves document digitization and a business intelligence layer. One question I have been asking myself and that seems to elicit a good amount of discussion is whether it is possible to compose flows. I have seen proposals for subflows, and there is also a PIN on combining tasks: https://docs.prefect.io/core/PINs/PIN-05-Combining-Tasks.html. The PIN concludes that the model won't change and that "we have decided [...] [to] write clear documentation for how to build individual "combined" tasks for individual use cases". Is that documentation available? With all the proposals floating around, it is hard to separate between what is implemented and recommended practice vs. proposals and speculations. So in short: what is the recommended way to group tasks into logical units of work and make the overall application modular with respect to these groups?
  • a

    Andrew Hannigan

    12/04/2020, 1:44 AM
    With Environments being deprecated, what is the suggested replacement for something like DaskCloudProviderEnvironment?
    j
    2 replies · 2 participants
  • s

    Severin Ryberg [sevberg]

    12/04/2020, 9:23 AM
    Is it already possible for Prefect's Lambda-related tasks to take advantage of AWS Lambda's new support for container images? This is a game-changer, in my opinion, and I'm really eager to try it out! I imagine it could be something as "easy" as the DaskKubernetesExecutor, but with the serverless-goodness of Lambda 😄. If not already supported, is this something that's on the development radar?
    n
    1 reply · 2 participants
  • n

    Neeraj Sharma

    12/04/2020, 11:59 AM
    What is authentication mechanism on prefect?
    n
    1 reply · 2 participants
  • d

    David Kuda

    12/04/2020, 4:03 PM
    Hey everyone! I have been testing Prefect for two weeks now. We soon want to test it on some real projects! 🙂 I am trying to accomplish some things, but I don’t succeed yet … There is for example this issue which prevents me from advancing really: How do I change the port for database? This is how I do it now: I run
    prefect server start --postgres-port=5435 --ui-port=8081
    -> that works. And now I want to change it so the
    config.toml
    handles it. So in my home directory/.prefect (~/.prefect) I have the file `config.toml`with following content:
    toml
    [server]
    	[database]
    		host = "localhost"
    		port = 5435
    		host_port = 5435
    		url = "<https://localhost:5435>"
    		connection_url = "<postgresql://prefect:test-password@localhost:5435/prefect_server>"
    		name = "prefect_server"
    		username = "prefect" 
    		password= "test-password"
    		volume_path = "/Users/david/.prefect/pg_data"
    I have tried many things with this config.toml file … but nothing has worked. When I try
    prefect server start
    , I receive an error:
    ERROR: for t_postgres_1  Cannot start service postgres: Ports are not available: listen tcp 0.0.0.0:5432: bind: address already in use
    . What’s going wrong? When I write something wrong, say I use a colon instead of an equal sign,
    prefect server start
    returns yet another error, which is good. I have examined the possible config options with `prefect config`; I did not see any other setting for database / postgres. I am following the docs very precisely, I even did the YouTube tutorials which were hosted by Laura, which were all great. Best Regards from Berlin!
    m
    5 replies · 2 participants
  • h

    Hui Zheng

    12/04/2020, 6:38 PM
    Hello, @Kyle Moon-Wright We ran into a strange issue with Flow to prefect-cloud deployment. The context 1. We use
    flow.storage = Docker()
    for build our flow deployment. 2. we then use
    flow.storage.build(push=False)
    to build the docker container locally and test it run locally. 3. lastly, we use
    flow.register()
    to deploy the flow to prefect-cloud All 3 steps works fine on my local machine. I could deploy the flow to prefect-cloud successfully. My colleagues wants to do the same on their local machine, however, when two my colleagues did it, they ran into a healthcheck issue at
    step 3
    (See issue detail in the thread). we are using the same code base and using the same build-context environment. We have setup another docker container in which we build and deploy prefect flow, so that it ensures that we all have the same run-time and libraries when doing build-and-deploy. This is a high priority issue for us, because currently only I could do the flow deployment. It put high risk on our production downtime if my colleagues could not path and deploy the prefect flow when there is emergency incident situation. cc: @jars @Julie Sturgeon prefect version:
    prefecthq/prefect:0.13.15-python3.8
    n
    11 replies · 2 participants
  • d

    DJ Erraballi

    12/04/2020, 6:54 PM
    https://docs.prefect.io/orchestration/concepts/flows.html#toggling-heartbeats we are running into a task that is long runing ~ 1hr that is getting killed by the zombie killer
    n
    3 replies · 2 participants
  • d

    DJ Erraballi

    12/04/2020, 6:56 PM
    Is our best option to disable heartbeat? by calling mutation on the current flow id?
  • d

    DJ Erraballi

    12/04/2020, 6:56 PM
    is there an easy way to figure out why heartbeats would stop getting sent?
  • d

    DJ Erraballi

    12/04/2020, 6:59 PM
    No heartbeat detected from the remote task; marking the run as failed.
  • d

    Daniel Nussbaum

    12/04/2020, 7:18 PM
    Apologies if this is a basic question -- we are considering using Prefect Cloud for our ETL tasks. I cannot find specifically what data is sent from Prefect Server to Prefect Cloud -- is there a document outlining the data being sent and security standards / policies more generally? I'd appreciate anyone who can point me in the right direction here!
    n
    d
    8 replies · 3 participants
  • a

    Alex Joseph

    12/04/2020, 7:41 PM
    While trying to use
    S3List
    with
    prefix
    and
    delimiter
    (trying to list "folders" in an S3prefix, say) I get an empty array:
    S3List(config['input_bucket_name']).run(prefix=test, delimiter="/", max_items=10)
    If I run it manually using boto, I'm able to get results:
    import boto3
    
        client = boto3.client('s3')
        paginator = client.get_paginator('list_objects')
        result = paginator.paginate(Bucket=config['input_bucket_name'], Delimiter='/', Prefix=test)
        for prefix in result.search('CommonPrefixes'):
            print(prefix.get('Prefix'))
    Is this expected behavior?
    n
    3 replies · 2 participants
Powered by Linen
Title
a

Alex Joseph

12/04/2020, 7:41 PM
While trying to use
S3List
with
prefix
and
delimiter
(trying to list "folders" in an S3prefix, say) I get an empty array:
S3List(config['input_bucket_name']).run(prefix=test, delimiter="/", max_items=10)
If I run it manually using boto, I'm able to get results:
import boto3

    client = boto3.client('s3')
    paginator = client.get_paginator('list_objects')
    result = paginator.paginate(Bucket=config['input_bucket_name'], Delimiter='/', Prefix=test)
    for prefix in result.search('CommonPrefixes'):
        print(prefix.get('Prefix'))
Is this expected behavior?
Looking at the code, one solution would be have a way to get the
CommonPrefixes
rather than the
Contents
(or get both)
n

nicholas

12/04/2020, 9:40 PM
That's a good flag @Alex Joseph - would you mind opening a PR to the
S3List
task for that? 🙂
a

Alex Joseph

12/05/2020, 5:45 PM
I'm just getting my head around the code, will definitely put a PR once I'm comfortable 🙂
View count: 2