https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Son Mai

    06/26/2021, 3:21 PM
    My project structure is: project_name src/ --> logic code in here my_flow.py • Run in local with python my_flow.py -> ok • Regis to Prefect Cloud and run flow in prefect with my Local Agent with Failed. Failed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'src\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.') update: I fixed by run agent local on project path (same directory of my_flow.py). But can i add enviroment path of each project to each flow for Agent local can run all flow ?
    k
    • 2
    • 1
  • b

    Ben Muller

    06/28/2021, 3:41 AM
    Hey - how do you
    deserialize
    a
    .prefect_result
    file that is persisted to S3 as a result?
    k
    • 2
    • 2
  • r

    Rishabh Poddar

    06/28/2021, 7:21 AM
    Hey! I had a question about self hosted and pricing. I believe I need to use the open source version for that. Is that free at whichever scale? What about compliance and some of the features like ML training, read-only accounts etc..?
    k
    • 2
    • 4
  • s

    Shivam Shrey

    06/28/2021, 9:55 AM
    Hey, I am new with Prefect. When you run a flow using Local Agent, where does the flow and the tasks exactly run? Does it run as a Python process or does it run inside a docker container..? Or where a task/flow will run depend on the type of agent? How can this be configured? Cause when I start the prefect server, several docker containers are run...
    k
    • 2
    • 3
  • r

    Romain

    06/28/2021, 12:30 PM
    Hi Prefect community, I am running into a weird memory issue on my deployment. I m using Prefect Core + Server with a K8s agent and a temporary dask executor. My DAG contains a lot of tasks (because of some large mapping). The dask scheduler memory grows from few 100Mb to several Gb before being killed. I can't really explain why s that, but it might be due to my limited Dask distributed understanding. I also have checked the issue on Github, and I found that https://github.com/PrefectHQ/prefect/issues/4085 I m personally using an Azure Blob as storage. Could it be the same issue? To test that out, I wanted to switch to a Module storage, but for a reason that I dont understand, it does not spin up the temporary dask clsuter and runs everything on the K8s job. I would love to hear any suggestions. Thanks
    d
    • 2
    • 11
  • t

    thebuleon29

    06/28/2021, 4:39 PM
    Hi guys, I am trying to use Kubernetes
    RunNamespacedJob
    tasks in my flow. I installed Prefect using
    pip install "prefect[kubernetes]"
    , but when I try to run my flow i get `ImportError: Using
    prefect.tasks.kubernetes
    requires Prefect to be installed with the "kubernetes" extra.` . I also tried directly from source, cloning the git repo and running
    pip install "./prefect[kubernetes]"
    but I get the same result...
    k
    • 2
    • 6
  • d

    Dror Speiser

    06/28/2021, 7:42 PM
    Hej hej A beginner's question: I have a simple flow, something like A->B, where A produces some file paths, and B does some stuff with them and then also produces some files paths. I've run this and it worked great. I now want to add a new part: A->B->C, and I want the previous runs to automatically rerun, with the output of B going into C. I think this should be possible with some combination of caching and checkpoints enabled, maybe running on prefect server/cloud (i.e. not core). Can this be done?
    k
    • 2
    • 2
  • l

    Leon Kozlowski

    06/28/2021, 8:51 PM
    Has anyone had to pip install a private github repo as a dependency in their Dockerfile while using DockerStorage?
    k
    • 2
    • 7
  • r

    Rob Fowler

    06/29/2021, 5:16 AM
    Back again, running prefect server for months. An associate wants to run prefect with the docker agent locally so I grabbed my working flows from my running prefect server. I deployed it locally to a local server with a docker agent. When I do a run, the docker agent picks up the flow, and starts it "DEBUG - Docker | Docker container 710cddcf0502310b6a1db495380c8b6dafe4b7db85b8a16ab96d84a0ef0cc99d started" But nothing else happens in the log or in the prefect ui. I am using the same custom docker storage from prod with k8s runners and this works fines.
    k
    • 2
    • 1
  • j

    John Berrisford

    06/29/2021, 8:08 AM
    Hi We are looking to integrate Prefect into our system as a workflow engine. One of the tasks we are looking to move is the running of crons. These crons run on a few different schedules and some of them take 30-45 mins and we would like to run them hourly. Some of these tasks we would like to block the next run until the current run has finished. This is especially important for tasks like database loading where we don't want two tasks running at the same time. Is it possible in Prefect have a task block the next scheduled run of the same task? This is a default in Airflow, but I haven't found an option in Prefect to do this. Can anyone suggest how to do this? Thanks
    k
    m
    • 3
    • 13
  • j

    Joseph Ellis

    06/29/2021, 11:56 AM
    I’m probably being stupid, but does anyone know where I can find the Prefect Cloud REST API reference?
    e
    • 2
    • 3
  • m

    Michael Law

    06/29/2021, 1:22 PM
    Hey folks, anyone got any thoughts centralising logging for prefect cloud e.g writing logs out to somewhere else by default? I'd like to have a centralised logging story for our operational monitoring and alerting via Azure, not sure if this is something on the Prefect roadmap? Keep up the amazing work!
    k
    • 2
    • 2
  • t

    Tom Forbes

    06/29/2021, 1:23 PM
    Hey! I’d like to clarify the best way to do a map-reduce style job with Prefect and Dask. We have some Parquet files in S3 containing a large number of S3 paths, and for each one we need to download the contents, run it through a library and collect the results into more Parquet files in S3. We could structure it like this using Dask:
    import dask.dataframe as dd
    
    dataframe = dd.read_parquet("<s3://foo/bar>")
    dataframe = dataframe.apply(do_something_expensive, axis="columns") 
    dataframe.to_parquet("<s3://foo/bar2>").execute()
    The question is: do Prefect mapping tasks help here? As I understand it a reduce task in Prefect cannot be done on each partition, so it needs the full set of inputs to be passed to it. If those inputs don’t fit in memory then it will fail? I must be misunderstanding something, because wouldn’t this limitation be quite… limiting?
    k
    s
    • 3
    • 11
  • t

    Tom Forbes

    06/29/2021, 1:59 PM
    In the documentation here it shows how you would configure Dask using a gateway rather than using a temporary cluster:
    gateway = Gateway()
    cluster = gateway.new_cluster()
    executor = DaskExecutor(
        address=cluster.scheduler_address,
        client_kwargs={"security": cluster.security}
    )
    flow.run(executor=executor)
    How is this supposed to work with flows that use Docker storage? The specific executor needs to be resolved at import time, and using the example code in the docs it would mean creating a cluster at import time.
    k
    • 2
    • 3
  • y

    Yanina Libenson

    06/29/2021, 2:53 PM
    hello everyone! i need your help. i am running a task that queries a postgresql database in amazon RDS. when the query is simple, the task runs successfully. when the query is heavier, I see “Process PID 61 returned non-zero exit code” in the shell, and the task gets cancelled after a while because there is no heartbeat. what can i do? btw, i tried this locally and it worked fine. now i uploaded my code to render.com and it fails.
    k
    • 2
    • 13
  • t

    Tom Forbes

    06/29/2021, 3:11 PM
    Are there any patterns we should follow for exposing the output of a flow to other flows? As I understand it this isn’t very simple right now, but if we have a flow that outputs some data to S3 and returns the path to that data as a Result, how would we consume it downstream? i.e
    with Flow() as flow:
        s3_path = get_output_from_task(project="abc", flow="test-flow")
        dataframe = read_data_from_s3(s3_path=s3_path)
    I see this is kind of supported by directly using the graphql API, and maybe having some convention around terminal output tasks, but I was wondering if there was a better way
    k
    • 2
    • 5
  • m

    Mahesh

    06/29/2021, 3:38 PM
    Hello Team, I am running prefect flow of tasks in for loop with task_run_names, Similar to below example.
    @task
    def get_values():
        return ["value", "test", "demo"]
    
    @task(task_run_name="{val}", state_handlers=[post_to_mail])
    def compute(val):
        if val == "demo":
            raise ValueError("Nope!")
    
    with Flow("task_run_names") as flow:
        vals = get_values()
        compute.map(vals)
    I want to enble statehandlers for the tasks to notify the failed tasks with task_run_names, Since it is looping tasks, am getting task name but not task_run_name. Is there any prefect.context to get task_run_name or any way to get task_run_name in statehandlers.
    k
    m
    k
    • 4
    • 9
  • j

    Jeremy Phelps

    06/29/2021, 6:15 PM
    Hi all, I've lost the ability to retrieve logs using the CLI tool:
    agent@prefect-agent:~$ prefect get logs --name granite-orca > log.log
    Traceback (most recent call last):
      File "/home/agent/.pyenv/versions/3.7.3/bin/prefect", line 10, in <module>
        sys.exit(cli())
      File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/cli/get.py", line 430, in logs
        result = Client().graphql(query)
      File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/client/client.py", line 319, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['flow_run'], 'message': 'Operation timed out', 'extensions': {'code': 'API_ERROR'}}]
    I'm not sure what's going on, I had no trouble until recently. The Web UI can still retrieve logs, but it takes too long to scroll through them as it only keeps 100 entries in memory at a time.
    k
    m
    • 3
    • 9
  • b

    Brett Naul

    06/29/2021, 6:24 PM
    I'm seeing all my flows fail with
    requests.exceptions.RetryError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: /graphql (Caused by ResponseError('too many 429 error responses'))
    , maybe related to the above error?
    k
    • 2
    • 7
  • b

    Brad

    06/29/2021, 10:18 PM
    Has anyone had TLS issues connecting to an existing coiled cluster?
    TypeError: TLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?)  Instead got None
    k
    • 2
    • 7
  • m

    matta

    06/29/2021, 10:19 PM
    Would Coiled be a painless way to have Prefect Flows all run in their own little on-demand containers?
    k
    • 2
    • 2
  • c

    Connor Campbell

    06/29/2021, 11:23 PM
    Just starting out with Prefect and building a POC for my company which currently has no MLOps in place. This is my first try at building a flow. I would add I am a Data Scientist not an engineer. https://github.com/fjord-prefect/ModelOps-with-Prefect One question for you guys. Built this on windows; however, when I pull to my ubuntu machine I'm getting what I'm assuming are python and prefect path issues due to Linux/docker requiring me to use sudo? I can register the flow it just hangs in submitted state and I get an error in the agent logs that I have attached. I'm assuming it's something with sudo paths and docker, python and prefect. Any suggestions?
    error.txt
    m
    • 2
    • 4
  • k

    Kevin Kho

    06/30/2021, 12:47 AM
    Just a reminder that we have an event with Coiled tomorrow where I will be showing the new KV Store and our CEO Jeremiah will be giving general updates about company. Hope to see you there!
    :cool-llama: 1
    :upvote: 3
  • t

    takan

    06/30/2021, 2:17 AM
    Hi all, I’m new here and decided to learn Prefect but are anyone using it with Airbyte? I just want to know if there is a common best practice for that.
    k
    • 2
    • 2
  • j

    jcozar

    06/30/2021, 8:31 AM
    Hi all! I have a question about using the idempotency key when registering a new flow. I am using flow.serialized_hash() as it is described in the documentation. However, there are some global variables that are used in my flow, and when they changed the flow serialized hash doesn’t, so my flow is not registered if I use the idempotency key. Anyone had the same problem and how did you manage to solve it? Thank you very much!
    👀 1
    k
    • 2
    • 3
  • n

    Nikola Lusic

    06/30/2021, 11:15 AM
    Hello everyone. I've got a question regarding logging. In the documentation it is mentioned that you can add custom log handlers to prefect logs. When adding a custom log handler, and simply running the flow via the local script (
    flow.run()
    ), the custom log handler works. However, when registering the flow to my localhost deployment of prefect server and agent (
    flow.register()
    ), the log handlers don't seem to do anything (although the logs are still visible in the Prefect UI). In the following example, when running the flow locally, all logs are sent to our Logstash instance successfully. However, when registering to localhost server and running the flow, no logs are sent to the Logstash.
    import prefect
    from prefect import task, Flow
    from prefect.run_configs import LocalRun
    from prefect.utilities.logging import get_logger as get_prefect_logger
    
    
    
    @task(name='Test Task')
    def test_task():
        logger = prefect.context.get("logger")
        logger.error('TEST LOG 1 - context logger')
    
        logger = get_custom_logger()
        logger.addHandler(get_logstash_handler())
        logger.error('TEST LOG 2 - custom logger')
    
        logger = get_prefect_logger()
        logger.addHandler(get_logstash_handler())
        logger.error('TEST LOG 3 - util logger')
    
        return
    
    
    with Flow("test_flow") as flow:
        test_task()
    
    task_logger = get_prefect_logger('Test Task')
    task_logger.addHandler(get_logstash_handler())
    
    if __name__ == "__main__":
        flow.run_config = LocalRun()
        #flow.run()
        #flow.register(project_name='localhost')
    Is there something I'm missing in this setup?
    s
    k
    • 3
    • 16
  • m

    Matheus Cruz

    06/30/2021, 12:33 PM
    hello everyone! Could someone tell me what features Open Source does not have in relation to Prefect Cloud and if it has any performance loss? I tried to find this information on the website but I got a little confused
    z
    • 2
    • 3
  • j

    Jacob Baruch

    06/30/2021, 1:27 PM
    Hi all! we want to deploy prefect using Kubernetes on Azure. our dev team has little problems with that, do any one has experienced with that? any docs you used that helped you? Thanks!
    k
    m
    • 3
    • 2
  • b

    Bruno Murino

    06/30/2021, 2:03 PM
    Hi everyone — I have a task that returns a python dictionary, now I’m trying to use one of the elements of the dictionary as input for another task, but I’m getting an error.. Here’s a minimal working example:
    import prefect
    from prefect import task, Flow, Parameter, unmapped, case
    from prefect.tasks.shell import ShellTask
    
    shell_task = ShellTask(stream_output=True)
    
    @task
    def get_config():
        return {
            'path': '/root/',
        }
    
    with Flow("test") as flow:
        config = get_config()
        bar = shell_task(command="pwd", helper_script = f"cd {config['path']}")
    
    flow.run()
    k
    • 2
    • 4
  • b

    Bruno Murino

    06/30/2021, 2:04 PM
    and the logs:
    [2021-06-30 14:03:32+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
    [2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config': Starting task run...
    [2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config': Finished task run for task with final state: 'Success'
    [2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
    [2021-06-30 14:03:32+0000] INFO - prefect.ShellTask | /tmp/prefect-if2skw8w: line 1: syntax error near unexpected token `newline'
    [2021-06-30 14:03:32+0000] INFO - prefect.ShellTask | /tmp/prefect-if2skw8w: line 1: `cd <Task: get_config['path']>'
    [2021-06-30 14:03:32+0000] ERROR - prefect.ShellTask | Command failed with exit code 2
    [2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | FAIL signal raised: FAIL('Command failed with exit code 2')
    [2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'ShellTask': Finished task run for task with final state: 'Failed'
    [2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config['path']': Starting task run...
    [2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config['path']': Finished task run for task with final state: 'Success'
    [2021-06-30 14:03:32+0000] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Powered by Linen
Title
b

Bruno Murino

06/30/2021, 2:04 PM
and the logs:
[2021-06-30 14:03:32+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config': Starting task run...
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config': Finished task run for task with final state: 'Success'
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2021-06-30 14:03:32+0000] INFO - prefect.ShellTask | /tmp/prefect-if2skw8w: line 1: syntax error near unexpected token `newline'
[2021-06-30 14:03:32+0000] INFO - prefect.ShellTask | /tmp/prefect-if2skw8w: line 1: `cd <Task: get_config['path']>'
[2021-06-30 14:03:32+0000] ERROR - prefect.ShellTask | Command failed with exit code 2
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | FAIL signal raised: FAIL('Command failed with exit code 2')
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'ShellTask': Finished task run for task with final state: 'Failed'
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config['path']': Starting task run...
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config['path']': Finished task run for task with final state: 'Success'
[2021-06-30 14:03:32+0000] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
View count: 1