https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • k

    Kurt Rhee

    08/09/2021, 2:05 PM
    Hello is there a way to run all flows or run flows manually from the UI?
    k
    • 2
    • 6
  • m

    Madison Schott

    08/09/2021, 2:55 PM
    Hi all, trying to figure out the best way to deploy by Prefect agent to AWS and came across this tutorial- has anyone followed this for their production deployments? What are the benefits of reading the flow code from S3 bucket? Thanks! https://towardsdatascience.com/serverless-data-pipelines-made-easy-with-prefect-and-aws-ecs-fargate-7e25bacb450c
    k
    j
    m
    • 4
    • 107
  • m

    Marcus Hughes

    08/09/2021, 11:05 PM
    I've been using Prefect to run a couple automated flows of some simple tasks on a local server, and I just discovered that Prefect appears to be saving serialized/pickled results into
    ~/.prefect/results
    without me specifically telling it to. That's fine in the short term, but after letting things run I discovered I had over 800 gigabytes of results. Is there an automated way to delete these after some given time elapses built into Prefect? I could just make another flow that cleans up results that are older than a day or something to keep us from ballooning out of drive space. Just curious what the best practice is here. Also, is it safe for me to just delete entries from this directory or will it corrupt a database somewhere?
    c
    • 2
    • 6
  • r

    Ryan Sattler

    08/10/2021, 1:32 AM
    hi - in my test environment I need to access my S3 Storage (mocked out with Localstack) via a different base URL depending on if I’m registering the container (via my local command line) or actually running it (in local Kubernetes). I’ve tried to configure things like this:
    home = os.environ.get("HOME")
    if home != "/Users/ryan.sattler":
        s3_url = "<http://localstack:4566>"
    else:
        s3_url = "<http://localhost:31566>"
    
    flow.storage = S3(bucket="prefect-flows", key="hello-task.py", client_options={
        "endpoint_url": s3_url,
        "aws_access_key_id": "",
        "aws_secret_access_key": ""
    })
    However this doesn’t seem to work (the localhost url is always being used, so it works for registration but not at runtime), possibly because the value of flow.storage is getting baked-in at registration time? How can I make this dynamic? Unfortunately given my company’s security setup there is no convenient way to use a real S3 bucket (or real container registry or etc) when testing locally.
    c
    • 2
    • 8
  • с

    Сергей Романкевич

    08/10/2021, 5:19 AM
    Hi everyone! After updating the server to version 15, I cannot register new flows. The server itself works fine and executes the already registered flows. When I try to run the code, I get the following error: requests.exceptions.HTTPError: 407 Client Error: Proxy Authentication Required for url: http: // localhost: 4200 / graphql How can I solve this error?
    rsv@srv-etl:~/flow$ /bin/python /home/rsv@comandor.local/flow/Test.py
    Traceback (most recent call last):
      File "/home/rsv@comandor.local/flow/Test.py", line 13, in <module>
        flow.register(project_name='Test')
      File "/usr/local/lib/python3.7/dist-packages/prefect/core/flow.py", line 1734, in register
        idempotency_key=idempotency_key,
      File "/usr/local/lib/python3.7/dist-packages/prefect/client/client.py", line 1081, in register
        project = self.graphql(query_project).data.project  # type: ignore
      File "/usr/local/lib/python3.7/dist-packages/prefect/client/client.py", line 534, in graphql
        retry_on_api_error=retry_on_api_error,
      File "/usr/local/lib/python3.7/dist-packages/prefect/client/client.py", line 452, in post
        retry_on_api_error=retry_on_api_error,
      File "/usr/local/lib/python3.7/dist-packages/prefect/client/client.py", line 696, in _request
        session=session, method=method, url=url, params=params, headers=headers
      File "/usr/local/lib/python3.7/dist-packages/prefect/client/client.py", line 605, in _send_request
        response.raise_for_status()
      File "/usr/local/lib/python3.7/dist-packages/requests/models.py", line 943, in raise_for_status
        raise HTTPError(http_error_msg, response=self)
    requests.exceptions.HTTPError: 407 Client Error: Proxy Authentication Required for url: <http://localhost:4200/graphql>
      "config_overrides": {},   "env_vars": [],   "system_information": {     "platform": "Linux-4.19.0-14-amd64-x86_64-with-debian-10.8",     "prefect_backend": "server ",     " prefect_version ":" 0.15.0 ",     " python_version ":" 3.7.3 "    Test.py
    from prefect import task, Flow, Parameter
    @task(log_stdout=True)
    def say_hello(name):
        print("Hello, {}!".format(name))
    with Flow("My First Flow") as flow:
        name = Parameter('name')
        say_hello(name)
    flow.register(project_name='Test')
    k
    n
    • 3
    • 9
  • r

    Rodrigo Menezes

    08/10/2021, 2:15 PM
    Any chance we can update the Prefect Docker packages? Snyk caught some minor vulns on the current version. The problem is upstream. I think the official Docker Python image was a little behind. I think a simple re-build should fix this. (None of these vulns are important if your Prefect agent is in a private subnet. I mostly care to make our automated security tools shut up 🙃 )
    k
    • 2
    • 2
  • j

    Julio Venegas

    08/10/2021, 4:18 PM
    hey prefect-server folks! I’m working on a state handler function that sends a notification to teams when a tasks fails, the same task is retried and when the retry of the task is successful. The notification on failure and retry of the task is working fine, but I’m not managing to trigger the notification when the retry is successful. Any help? Code inside thread.
    k
    • 2
    • 3
  • y

    YD

    08/10/2021, 11:21 PM
    Can you manage users permissions on an on-prem Prefect.io server ?
    k
    • 2
    • 2
  • t

    Tom Forbes

    08/11/2021, 5:09 PM
    Is it OK to assume that when following the
    scaling out
    section of the tutorial (https://docs.prefect.io/core/tutorial/06-parallel-execution.html#scaling-out), when using a remote
    DaskExecutor
    , logs from tasks will not be mirrored locally and we will have to build a custom logger for this? This limitation isn’t explicitly detailed, which is confusing
    k
    f
    c
    • 4
    • 5
  • m

    Madison Schott

    08/12/2021, 10:02 PM
    Has anyone else ever followed this tutorial when using DbtShellTask? I've created my Dockerfile to be similar but am having issues with my dbt files https://www.lejimmy.com/distributed-data-pipelines-with-aws-ecs-fargate-and-prefect-cloud/
    n
    j
    • 3
    • 28
  • a

    Arun Dass

    08/12/2021, 11:02 PM
    Hey is their a tutorial floating around for deploying your workloads to GKE using prefect cloud backend?
    n
    • 2
    • 2
  • a

    Arun Dass

    08/12/2021, 11:03 PM
    Im looking for any docs on a horizontally scaling cluster setup
  • r

    Ryan Sattler

    08/16/2021, 6:07 AM
    Is there a standard idiom for triggering a flow from something like a message arriving on a Kafka queue? I guess this could be done by polling with a high-frequency cron-scheduled flow but I wonder if there’s something more elegant.
    k
    • 2
    • 4
  • h

    Hilary Roberts

    08/16/2021, 1:44 PM
    How does flow registration work in Prefect Server? We’ve been trialling prefect cloud and I have a good understanding of how registering flows with prefect cloud works. Now we also want to look at hosting our own instance of prefect server to get a good comparison. I have a prefect server instance set up on an AWS EKS cluster, and am wondering what the best way is to deploy flows to it. Is there a way to deploy a flow by running flow.register() from my local machine, as I would do if I was using Prefect cloud, or do I have to copy the flow’s code onto the machine on which I’m hosting prefect server and run flow.register() there? Can prefect user use external storage (like cloud does) or are flows always stored on the server instance? I haven’t found the answer to the above in the docs, sorry if I missed something obvious.
    s
    • 2
    • 2
  • k

    kevin

    08/16/2021, 6:29 PM
    hey guys, if I want to check for the existence of a secret, is the best way to do that through
    prefect.context.get('<SECRET NAME>')
    or should I use the
    Secret()
    class?
    k
    • 2
    • 6
  • a

    Alex Furrier

    08/16/2021, 10:08 PM
    I'm trying to debug a flow running Prefect server but all the logs except for tasks start and end (e.g.
    Task 'my_task': Starting task run...
    and
    Task 'my_task' :Finished task run for task with final state: 'Success'
    ) are not showing up. I'm trying to log info to help debugging within tasks like so
    @task
    def my_task(x):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f"Task input: {x}")
    
        if isinstance(x, str):
            logger.error("Not good. Input x is a string")
            raise ValueError
        return x
    but nothing shows up in the logs on the server UI. Any ideas what I can do so that logs show up?
    k
    • 2
    • 4
  • l

    Luke Kentwell

    08/17/2021, 10:26 AM
    Hi Everyone, Does anyone here have any advice for running the prefect server behind a HTTP proxy? I am having all sorts of trouble running the server. All the containers startup, but the UI cant connect to the backend, and also, any agents I run cant connect.. Everything is running in the same docker-network, I have the 'NO_PROXY=localhost,127.0.0.1' env set.. If I remove the HTTP_PROXY env variables the server works fine, but my agent can't get to the outside world to download pip packages etc.. If I leave the HTTP_PROXY env variable in.. the server UI/agents can't seem to talk to the backed. Any help would be much-appreciated 😄. (Also, full disclosure, I had raised an issue on git hub for basically this, but closed it because I thought I had fixed the problem (https://github.com/PrefectHQ/prefect/issues/4859) turns outs that while I got the server running I lost internet access in the process, but I have since found this community so though I would ask here instead of raising issues on github :D)
    k
    • 2
    • 6
  • p

    Payam Vaezi

    08/17/2021, 12:24 PM
    Hi all, I'm trying to execute a simple workflow on prefect server (deployed on a custom k8s) and getting this error when cloud runner executes the flow:
    │ [2021-08-17 12:20:57+0000] INFO - prefect.S3 | Flow successfully downloaded. ETag: "ae24fd937c7bfe0317bf996718070f86", LastModified: 2021-08-17T12:13:58+00:00, VersionId: JLZcoHjb99CPDG │
    │ [2021-08-17 12:20:58+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for '44cf3c22-40d8-427c-bbf2-d40d4ca113e5'                                                                 │
    │ [2021-08-17 12:20:58+0000] INFO - prefect.CloudTaskRunner | Task 'source': Starting task run...                                                                                           │
    │ [2021-08-17 12:20:58+0000] INFO - prefect.CloudTaskRunner | Task 'source': Finished task run for task with final state: 'Running'                                                         │
    │ [2021-08-17 12:20:58+0000] INFO - prefect.CloudTaskRunner | Task 'sink': Starting task run...                                                                              │
    │ [2021-08-17 12:20:58+0000] INFO - prefect.CloudTaskRunner | Task 'sink': Finished task run for task with final state: 'Pending'                                            │
    │ [2021-08-17 12:20:58+0000] INFO - prefect.CloudFlowRunner | Flow run RUNNING: terminal tasks are incomplete.
    Haven't seen this problem in local runs! Can someone advise me what may be wrong (I'm getting this intermittently)?
    k
    • 2
    • 1
  • w

    Wilson Bilkovich

    08/17/2021, 3:48 PM
    So, I had a little trouble bootstrapping Prefect Server on Kubernetes. I used the Server Helm chart, then generated a deployment for the agent with the CLI, specifying --rbac and --api The resulting agent gets an error from the server saying that no tenants have been defined, and suggests using the UI to create one. However, the UI doesn’t seem to have any such tab/link.. I had to exec into the server container, install python and prefect, and then do
    prefect server create-tenant --name default
    Is there a more straightforward way to do that in future deployments?
    m
    m
    • 3
    • 9
  • w

    Wilson Bilkovich

    08/17/2021, 9:16 PM
    I did
    --set ui.apolloApiUrl=http://${CHART_NAME}-apollo:4200/graphql
    on the prefect-server Helm chart, but when I went to the UI, I still had to configure the API URL. Anybody know what I might be missing?
    m
    t
    • 3
    • 6
  • m

    Michael Hadorn

    08/18/2021, 2:12 PM
    Hi there Prefect Server fills my server log with these entries:
    May 06 15:06:18 <http://rnddkrpocwe01.uhbs.ch|rnddkrpocwe01.uhbs.ch> systemd[2918188]: run-docker-runtime\x2drunc-moby-f59082c1b02d87fbf7172c4bef313af171b43336a316ac5ea05f495dc159627d-runc.nCo90B.mount: Succeeded.
    May 06 15:06:19 <http://rnddkrpocwe01.uhbs.ch|rnddkrpocwe01.uhbs.ch> systemd[2918188]: run-docker-runtime\x2drunc-moby-d55a5e41bcd0602af5682df47f4e5b944d17238994b03d9c2b34b8c4ec20c4ad-runc.KWtz2X.mount: Succeeded.
    May 06 15:06:19 <http://rnddkrpocwe01.uhbs.ch|rnddkrpocwe01.uhbs.ch> systemd[1]: run-docker-runtime\x2drunc-moby-d55a5e41bcd0602af5682df47f4e5b944d17238994b03d9c2b34b8c4ec20c4ad-runc.KWtz2X.mount: Succeeded.
    According to https://github.com/docker/for-linux/issues/679: It's produced by the healtchecks. Is there a way to solve this? Best Michael
    m
    • 2
    • 8
  • p

    Pierre Monico

    08/19/2021, 10:05 AM
    Could someone very briefly explain on what sort of infrastructure they have their
    Docker Agent
    deployed? I am having a hard time understanding how exactly that will work, in particular: since it needs access to a Docker Daemon to run e.g.
    DockerRun
    run configs, is this supposed to be a “normal” VM? Or can we do some Docker in Docker magic? (NB: I use Docker but am not an expert)
  • k

    Kevin Kho

    08/19/2021, 1:02 PM
    Hey @Pierre Monico, you will have some problems with Docker in Docker so it’s intended to be a normal VM that will spin up the Docker images. See this
    👀 1
  • p

    Pierre Monico

    08/19/2021, 3:15 PM
    Reading the
    Git
    storage class source, am I correct to assume that it’s not possible to pull from a local repository?
    k
    m
    • 3
    • 33
  • s

    Serdar Tumgoren

    08/19/2021, 10:46 PM
    Hey all, I’m trying to test Prefect Cloud on Kubernetes (GKE) with DaskExecutor and GCS storage, based loosely on the instructions from Prefect https://docs.prefect.io/orchestration/agents/kubernetes.html#running-in-cluster I’m able to get a Kubernetes Agent to run (verified in Prefect Cloud UI). But when I register a basic job and trigger a manual run in the Cloud UI, the job hangs in pending state (submitted mode) and never executes. Feels like I’m botching something related to the networking layer, but I can’t seem to pinpoint the issue after attempting some of the debugging techniques detailed in this thread: https://prefect-community.slack.com/archives/C014Z8DPDSR/p1628175479115400 Can anyone provide advice or a reference example for GKE in particular?
    k
    w
    • 3
    • 128
  • a

    Alex Furrier

    08/19/2021, 11:14 PM
    On a K8s deployed Prefect server, the graphql pod is stuck in a CrashLoopBackoff. Not sure why it exited as Completed?
    k
    s
    • 3
    • 10
  • a

    Alex Furrier

    08/20/2021, 4:05 AM
    Not sure if this is related to my post above, but now I'm running into an error where flow results appear to be dropping after mapping across 1000+ items. To reduce memory I'm serializing a task result and loading it from a passed location downstream. I initially read in data records, process them, and serialize them to Azure blob storage before returning a list of the serialized locations. Downstream I process these further, first deserializing them and then processing them. At a certain point (usually about 1100 records in but that could be totally arbitrary) the mapped task fails because the passed result location reference is None for some reason. I've logged the result locations after the first serialization and there are no Nones in it, yet later on the string location is no longer present and breaks the task. Below is the pseudo code:
    RESULT_HANDLER = AzureResult(connection_string=os.environ.get('AZURE_STORAGE_CONNECTION_STRING'),
                                 container='prefect')
    with Flow(
        'Example Flow',
        executor=LocalDaskExecutor(
            scheduler="threads",
            num_workers=8,
            namespace='prefect'
        ),
    ) as example_flow:
    
        result_locations = query_data_process_serialize(query)
        # Logging locations to be sure they are serialized
        log_item(result_locations)
    
        # This seems to break after ~1000 result locations
        result_locations = deserialize_and_process_more.map(
            record=result_locations,
            result_handler=unmapped(RESULT_HANDLER),
            upstream_tasks=[result_locations)]
        )
    I've run this a few times and the same thing always happens. I've copied the logged result locations and indexed that list using failed task children to see if I can deserialize using
    RESULT_HANDLER
    and that works fine. E.g. if the flow failed on child 1100 when mapping
    deserialize_and_process_more
    I tried running
    RESULT_HANDLER.serializer.deserialize(
    RESULT_HANDLER.read(result_locations[1100]).value)
    and was able to deserialize the data just fine. Not sure where the disconnect is happening where the result locations become None somehow.
    s
    k
    • 3
    • 21
  • p

    Pierre Monico

    08/20/2021, 2:39 PM
    After being able to install Prefect Server on a Ubuntu20.04 machine, I get errors when running
    prefect server start
    . It seems like there is a problem with the Postgres instance. First I get errors from the hasura and graphql containers (can’t connect to Postgres), and then some postgres debugging messages; but still the other services can’t connect after that. Did anyone run into that before?
    k
    m
    • 3
    • 9
  • l

    Lawrence Finn

    08/20/2021, 3:12 PM
    I would like to communicate with prefect outside of python. Are there graphql definition files I can use to generate clients?
    m
    • 2
    • 10
  • c

    Casey Green

    08/24/2021, 3:35 PM
    Hi. I'm trying to write a flow that has a nested for-while loop, where the while loop contains multiple tasks. I don't think this is possible - it's sort of a combination of apply_map and task looping with signal.LOOP. Any pointers/ideas for how to solve this pattern? Or is it definitely not possible? Here's what kinda what the code looks like:
    for_while_flow.py
    k
    • 2
    • 7
Powered by Linen
Title
c

Casey Green

08/24/2021, 3:35 PM
Hi. I'm trying to write a flow that has a nested for-while loop, where the while loop contains multiple tasks. I don't think this is possible - it's sort of a combination of apply_map and task looping with signal.LOOP. Any pointers/ideas for how to solve this pattern? Or is it definitely not possible? Here's what kinda what the code looks like:
for_while_flow.py
k

Kevin Kho

08/24/2021, 3:52 PM
Hey @Casey Green, been thinking about this for a bit and I don’t think there is a way to get this to work because LOOPING itself is confined to one task. Looping over multiple tasks breaks the acyclic nature of the DAG. So I think you need to combine this into one big
run_job
task, and then LOOP there. You can just do
trigger_job.run()
and
wait_to_complete.run()
inside the big task, but they will just be like functions instead of Prefect tasks.
c

Casey Green

08/24/2021, 3:54 PM
gotcha, that makes sense since this wouldn't be a valid DAG! thanks 🙏
If I were to make this a flow instead, is it possible to automatically trigger retries at the flow level? e.g.
with Flow("test") as flow:
    flow_name = Parameter("flow_name", required=True)
    handle = trigger_job(job_name)
    result = wait_to_complete(handle)
    max_runs = 3

    # maybe this isn't a case... perhaps a terminal_state_handler that inspects the result and sets the state appropriately?
    with case(should_re_run(result), True):
        # trigger flow failure, but automatically retry.
Kinda looks and feels like a code smell 🤷‍♂️
I guess really what I'm trying to do is to retry an entire sub-branch of a DAG
which maybe is a weird/invalid thing to want to do 🤷‍♂️
k

Kevin Kho

08/24/2021, 4:39 PM
We don’t have retries at the Flow run level but if it is invoked through a
StartFlowRun
task or
create_flow_run
task, then we have retries, but the concept of a flow retry is not definitive because some users expect everything to run (even successful tasks) and some expect to run from where it left off. I think retries on these two Prefect tasks can be used for both scenarios by supplying a
idempotency_key
c

Casey Green

08/24/2021, 5:00 PM
oh interesting, I'll take a look at this task. this might work
View count: 1