https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • f

    Florian Kühnlenz

    08/04/2022, 2:02 PM
    is there any writeup why prefect is build around the idea of storing code in a block storage instead of a docker image? And related, @Anna Geller I saw you are planning to write some deployment recipes. Would it be possible to also write one that packs everything into a docker image instead of splitting dependencies and flow code?
    👀 3
    k
    a
    • 3
    • 6
  • m

    Matt Delacour

    08/04/2022, 2:25 PM
    Why can't I create users with the GraphQL API? Even though my account is an admin one ... 🤔 What kind of permissions do I need?
    👀 1
    z
    • 2
    • 9
  • t

    Tony Yun

    08/04/2022, 3:18 PM
    Hi, how can I import SFTP task class?
    prefect.tasks.sftp.sftp
    I’ve tried 1.20, 2.0 version but all don’t have this module available.
    s
    • 2
    • 4
  • m

    Matt Delacour

    08/04/2022, 3:38 PM
    And do we have a list of what each of those roles mean ? For example what is "audit trail" ? Should my normal "users" have read access to it ? 🤷
    👀 4
    z
    • 2
    • 6
  • d

    Dylan McReynolds

    08/04/2022, 3:46 PM
    Hello, I'm evaluating using prefect 2.0 with a local server. I think I'm missing a very basic concept about deployment. I want to set up a flow that is called many times a day, each time given a different data file as the parameter. I can do this quite easily locally by calling my flow's method with the file path as the parameter. But if I want to use the Orion server to manage these flow runs, I don't see how to launch a flow from an external application sending a different parameter value as the parameter value is tied to the deployment. It seems strange to have to create a deployment each time with this parameter (lots of extra work, and will quickly have thousands of deployments each used exactly once.)
    k
    p
    • 3
    • 7
  • c

    Chris Reuter

    08/04/2022, 3:55 PM
    Another livestream today with @Kalise Richmond starting in 1 hour, if you want to talk to someone live about Prefect 2.0! Register here to join!
    🙌 1
  • g

    Georvic Tur

    08/04/2022, 3:56 PM
    Hi, Community. Thank you a lot for your help in advance. I have a question related to an issue we have seen in the past. I will proceed to describe the situation: 1. Suppose the there is a parent flow P with version 1 and a child flow C with version 1. They run daily. 2. The parent flow invokes the child flow through StartFlowRun. 3. Suppose P1 is running and it is about to execute C1. 4. However, all flow are re-registered sequentially. The end result is that P2 and C2 become available. 5. The StartfFlow task for C1 remained in a pending state and then there was a TriggerFailed error. 6. C did not register any new Flow Run. 7. Agents did not receive any information on C. So, I was wondering if concurrency problems are possible if flows of flows are running and suddenly new versions are registered for all of them.
    👀 1
  • w

    William Jamir

    08/04/2022, 4:37 PM
    Hello 🙂 I have a question about Work Queues (image on thread). I have two work queues with the same tag (staging), if I triggered multiple runs (lets say five), this runs will be evenly distributed between the available agents? I’m asking this because I dont see a way to check on which queue the task was executed, and from the output of the CLI looks like only one agent is picking up everthing. I tested triggering the jobs with the agents off, and the list of flows appeared on both Work Queues as pending.
    k
    • 2
    • 11
  • m

    Mike Kovetsky

    08/04/2022, 4:40 PM
    Hey guys! I am having an issue with running flows as KubernetesJob together with prefect UI v2.0.2 in my GCP Kubernetes. let me describe all the steps. I guess this may be useful for other guys in community, as prefect docs/ho-tos are becoming quickly outdated due to rapid development: • save the output of
    prefect kubernetes manifest orion
    to
    k8s_deployment.yaml
    (please add the ability to set non-default k8s namespace) • change PREFECT_API_URL from http://orion:4200/api to http://0.0.0.0:4200/api •
    kubectl apply -f k8s_deployment.yaml
    (it already has command: [“prefect”, “orion”, “start”, “--host”, “0.0.0.0”, “--log-level”, “WARNING”] for API container) • check the GCP admin. The agent/ui/api seems to be healthy. •
    kubectl port-forward deployment/orion 4200:4200
    • check the UI is healthy via http://127.0.0.1:4200/. •
    prefect config set PREFECT_API_URL=<http://0.0.0.0:4200/api>
    • register GCS block via admin panel. name=prefect. path: gs://{my-path}. (prefect block register -f gcs_block.py didn’t work for me for some reason) •
    prefect deployment build ../flows/backtest.py:backtest_flow --name dev --tag dev --infra kubernetes-job --storage-block gcs/prefect
    (should i run it from root of the project?) • checked my GCS bucket. The files did appear in the local folder and GCS. •
    prefect deployment apply backtest_flow-deployment.yaml
    • check UI http://127.0.0.1:4200/deployments. deployment did appear • create
    kubernetes
    queue manually. The name seems to be hardcoded according to the error logs in GCP agent. This step fixes the error. •
    prefect deployment run Backtest/dev
    • the flow run was created. But it is stuck in pending. • The appropriate KubernetesJob was created. But it is stuck in error state with the error All connection attempts failed. and ConnectionRefusedError: [Errno 111] Connect call failed (‘0.0.0.0’, 4200). Please help! Thank you in advance :)
    a
    d
    • 3
    • 12
  • w

    William Jamir

    08/04/2022, 5:09 PM
    Is it possible to start multiple agents from the command line by only informing the tag? For example, If I execute:
    prefect agent start -t test
    I can see that a new “Work Queue” is added to the UI (
    Agent queue test
    ) But if I start a new agent (on another tab) with the same command, the UI keeps the same. Is this the expected behavior? Would it be possible to add a
    --name
    parameter on the start command to allow starting multiples agents to consume the same tag?
    o
    t
    • 3
    • 3
  • a

    aaron

    08/04/2022, 6:09 PM
    Hello! Is there a reason the credentials implementation in prefect-github is a Python dataclass and not a Prefect block (as it is in prefect-aws.credentials, for instance)?
    k
    a
    • 3
    • 5
  • c

    Chu

    08/04/2022, 6:25 PM
    Hi, a simple question for Prefect1.0, I define a function (but not decorated with @task decorator). This function does a third party API call to return a list of dictionary. Use case is this function will be called in
    with Flow
    block and the list it returns will be used in
    create_flow_run.map(parameters=...)
    , My question is when we deployed the flow (it runs every midnight), will the function we defined be called each time when Flow is scheduled to run?
    n
    • 2
    • 3
  • j

    Jai P

    08/04/2022, 6:26 PM
    👋 peeking into the future a little bit for prefect (also apologies if this already exists and i missed it), do you think we might be able to specify different caching mediums (e.g. cache using a distributed cache like redis)?
    👀 1
    a
    j
    • 3
    • 4
  • k

    Keith

    08/04/2022, 6:40 PM
    Running a Prefect 2.0 Hello World workflow with a GKE cluster from the image
    prefecthq/prefect:2.0.2-python3.10
    and when trying to access GCS it appears that
    gcsfs
    is not installed and am getting the following error from my flow, should I build a custom image that installs
    gcsfs
    ?
    Flow could not be retrieved from deployment.
    Traceback (most recent call last):
      File "/usr/local/lib/python3.10/site-packages/fsspec/registry.py", line 232, in get_filesystem_class
        register_implementation(protocol, _import_class(bit["class"]))
      File "/usr/local/lib/python3.10/site-packages/fsspec/registry.py", line 255, in _import_class
        mod = importlib.import_module(mod)
      File "/usr/local/lib/python3.10/importlib/__init__.py", line 126, in import_module
        return _bootstrap._gcd_import(name[level:], package, level)
      File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
      File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
      File "<frozen importlib._bootstrap>", line 1004, in _find_and_load_unlocked
    ModuleNotFoundError: No module named 'gcsfs'
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 312, in filesystem
        self._filesystem = fsspec.filesystem(scheme, **self.settings)
      File "/usr/local/lib/python3.10/site-packages/fsspec/registry.py", line 265, in filesystem
        cls = get_filesystem_class(protocol)
      File "/usr/local/lib/python3.10/site-packages/fsspec/registry.py", line 234, in get_filesystem_class
        raise ImportError(bit["err"]) from e
    ImportError: Please install gcsfs to access Google Storage
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 247, in retrieve_flow_then_begin_flow_run
        flow = await load_flow_from_flow_run(flow_run, client=client)
      File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 104, in with_injected_client
        return await fn(*args, **kwargs)
      File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 47, in load_flow_from_flow_run
        await storage_block.get_directory(from_path=None, local_path=".")
      File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 454, in get_directory
        return await self.filesystem.get_directory(
      File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 251, in get_directory
        return self.filesystem.get(from_path, local_path, recursive=True)
      File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 315, in filesystem
        raise RuntimeError(
    RuntimeError: File system created with scheme 'gcs' from base path 'gcs://<bucket>/deployments' could not be created. You are likely missing a Python module required to use the given storage protocol.
    ➕ 1
    j
    n
    • 3
    • 9
  • r

    Roger Webb

    08/04/2022, 7:20 PM
    How do you set up a parameter in a parent flow to be passed after modification into a child flow? I have problems in the using of the parameter. Ive tried...
    DOW = Parameter("DOW", default="Monday")
    FlowExecution = create_flow_run(
    flow_name=FlowName,
    project_name=ProjectName,
    task_args=dict(name="Flow Execution",trigger=all_successful),
    parameters={"StringA":"The Day of the week is "+DOW+"."}
    )
    AND
    DOW = Parameter("DOW", default="Monday")
    StringAField = "The Day of the week is "+DOW+"."
    FlowExecution = create_flow_run(
    flow_name=FlowName,
    project_name=ProjectName,
    task_args=dict(name="Flow Execution",trigger=all_successful),
    parameters={"StringA":StringAField}
    )
    ✅ 1
    a
    • 2
    • 4
  • j

    Jonathan Pou

    08/04/2022, 9:01 PM
    Hello! Has anyone been using Prefect 2.0 with Dask managed with Coiled? I am having issues with distributing a dataframe across multiple Dask workers within the same cluster. The cluster is deployed with 4 workers but the dataframe is only being loaded in 1 of the workers, leaving the 3 others idle. When I run the same script outside of a Prefect task & flow, it runs as expected - the dataframe gets distributed across the cluster for later use. I also tested another scenario where the cluster is deployed with only 1 worker, but with autoscaling enabled by passing adapt_kwargs={"maximum": 10} to DaskTaskRunner. In this scenario, it seems like the autoscaling request is ignored as the cluster remains of size 1. Lastly, is there a way to return the standard output of a print() statement executed on a remote dask cluster back to the Prefect logger?
    from prefect import flow, task
    from prefect_dask.task_runners import DaskTaskRunner
    import dask
    
    coiled_executor = DaskTaskRunner(
    	cluster_class="coiled.Cluster",
    	cluster_kwargs={
    		"n_workers" : 4,
    		"software": "ttibi-dev",
    		"shutdown_on_close": True,
    		"worker_vm_types":["r6a.large"]
    	},
    	adapt_kwargs={"maximum": 10}
    )
    
    
    @task
    def some_data_manipulation():
    	df = dask.datasets.timeseries(
    		"2000", "2020", partition_freq="2w"
    	).persist()
    	df.groupby("name").aggregate({"x": "sum", "y": "max"}).compute()
    	return df
    
    @flow(task_runner=coiled_executor)
    def test_flow():
    	some_data_manipulation.submit()
    
    if __name__ == "__main__":	
    	test_flow()
    ✅ 1
    👀 1
    m
    a
    s
    • 4
    • 6
  • m

    Matt Delacour

    08/04/2022, 9:19 PM
    Me again Why if I update permissions of a role to only be 1 thing, it still has access to things I did not specify ? Does it mean that those "Admin read values" are always ON for all users ?
    ✅ 1
    a
    • 2
    • 6
  • c

    Chris Reuter

    08/04/2022, 9:24 PM
    Last chance to submit your Code Contest submissions! Please post them in #show-us-what-you-got by midnight Eastern time tonight!
    🎉 2
    🚀 2
    👍 2
  • h

    Hafsa Junaid

    08/04/2022, 11:08 PM
    Team, My python program saves a csv file as an output in the same directory as of program, when I am trying to run this pipeline through prefect:2.0 cloud, I am getting this error. How I can get rid of it? The storage is configured by prefect itself, and when I am trying to specify storage block, deployment.yaml file is not being created. Also, Any working prefect 2.0 example which utilizes remote storage?
    ✅ 1
    a
    o
    • 3
    • 10
  • v

    Viet Nguyen

    08/05/2022, 1:51 AM
    Hi all, I'm wondering if we can manually kill a task_runner?
    m
    • 2
    • 1
  • m

    minhtuan

    08/05/2022, 3:11 AM
    Hi everyone, I'm trying to create a schedule task in a newest version but it seem didn't work. Anyone can help me because I was reading docs so many times but I couldn't find a way.
    Version: 2.0.3
    from prefect.schedules import IntervalSchedule
    ModuleNotFoundError: No module named 'prefect.schedules'
    c
    j
    k
    • 4
    • 4
  • m

    minhtuan

    08/05/2022, 3:15 AM
    https://docs.prefect.io/concepts/schedules/#the-scheduler-service
  • h

    Ha Pham

    08/05/2022, 4:38 AM
    Another thing regarding Schedules. According to this doc page:
    Deployments are changing, and along with them the way you specify a schedule on a deployment. Stay tuned for updated guidance.
    Currently it's not very clear to me how to set a schedule for my flows. Is it set in the flow code, or have to be set via the command line?
    m
    d
    k
    • 4
    • 6
  • j

    jcozar

    08/05/2022, 7:42 AM
    Hi everyone! I'm interested in open data catalog services. I'm trying open metadata and amundsen. I've seen that both only support Apache Airflow for ETLs orchestration. Do you know if Prefect is working with open metadata, amundsen, or other similar products? Thank you!
    a
    g
    • 3
    • 6
  • h

    Hamza Naanani

    08/05/2022, 9:11 AM
    Hello, I can't login on prefect cloud from my work computer using the CLI :
    prefect cloud login -k xxxxxxxxxxxxxxxx
    . I'm getting the following error :
    ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:997)
    . Is there a way to solve this ? I am using a windows machine and tried setting CURL_CA_BUNDLE=’’, PYTHONHTTPSVERIFY=‘false’ , but it didn't work
    s
    • 2
    • 5
  • s

    Sven Aoki

    08/05/2022, 9:35 AM
    I am not sure whether I am the only one facing the issue, but how can u deploy prefect orion permanently without using the cloud solution? In 1.0 it was fairly easy to adjust the docker-compose file. Now, I can run the orion server on our inhouse-cluster or a cloud-vm, but since I am not able to docker-compose it (somehow the communication between the db and orion is prevented although they are in the same network) , whenever the command line closes, where the server is start from, the server shuts down. Same for the agent. Maybe I completely miss something here?
    f
    • 2
    • 2
  • h

    Ha Pham

    08/05/2022, 9:53 AM
    Hi all, I'm trying to deploy a flow in prefect 2.0 but keep encountering this "flow not found" error, even when the flow is clearly there. Is there anything in the .py file that can possibly prevent the flow from being detected?
    m
    • 2
    • 5
  • b

    Bartosz Kopytek

    08/05/2022, 10:18 AM
    Hi, I am trying to run deployment on K8s based on these docs: https://docs.prefect.io/tutorials/_kubernetes-flow-runner/ But when I try to run the command:
    prefect deployment create ./kubernetes-deployment.py
    I get this error:
    No such command 'create'.
    My prefect version is 2.0.0 Anyone know what might be the cause?
    o
    k
    • 3
    • 4
  • m

    Milan Valadou

    08/05/2022, 10:29 AM
    Hi I’m trying to use Blocks for using secrets in my ETL. However, I keep on running into errors when trying to load them:
    ---------------------------------------------------------------------------
    AttributeError                            Traceback (most recent call last)
    Input In [1], in <cell line: 6>()
          3 secret_block = Secret.load("etlharvestqaspassword")
          5 # Access the stored secret
    ----> 6 secret_block.get()
    
    AttributeError: 'coroutine' object has no attribute 'get'
    I defined the block within the Orion UI, because when I tried to define it via code in a simple script (as suggested here), I get the following kind of error:
    prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '<http://ephemeral-orion/api/block_documents/>'
    Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'name'], 'msg': 'name must only contain lowercase letters, numbers, and dashes', 'type': 'value_error'}, {'loc': ['body', '__root__'], 'msg': 'Names must be provided for block documents.', 'type': 'value_error'}], 'request_body': {'name': 'test2_password', 'data': {'value': 'test2'}, 'block_schema_id': '8019abd6-409a-4f91-9367-bc8343c31763', 'block_type_id': '29fb0ec8-f7e9-4527-984c-48f8675f2bc4', 'is_anonymous': False}}
    For more information check: <https://httpstatuses.com/422>
    I’m mostly using Prefect within a jupyter notebook and from within a virtualenv. Thanks in advance for anyone who could point me to what’s going on 🙂
    m
    k
    s
    • 4
    • 10
  • o

    Oscar Björhn

    08/05/2022, 10:41 AM
    Does anyone know of a workaround to run a task from a task in Prefect 2? I've found some examples of people using .fn but it doesn't appear to work in my case. Specifically, I'd prefer to run trigger_dbt_cli_command from the prefect-dbt package from one of my tasks rather than having to create a flow to run it. I can't change the definition of trigger_dbt_cli_command, since the package itself defines it as a task. I think my problem might be related to the task being async, it seems to finish the execution instantly and the returned type is "None". As long as I'm not using fn(), Prefect will patiently wait for a result, but then I'm forced to call the task from a flow. Edit: If anyone thinks I'm going about this the wrong way, I'd be happy to hear that too. Maybe I just need to settle on always calling trigger_dbt_cli_command from a flow. 🙂
    ✅ 1
    k
    m
    • 3
    • 3
Powered by Linen
Title
o

Oscar Björhn

08/05/2022, 10:41 AM
Does anyone know of a workaround to run a task from a task in Prefect 2? I've found some examples of people using .fn but it doesn't appear to work in my case. Specifically, I'd prefer to run trigger_dbt_cli_command from the prefect-dbt package from one of my tasks rather than having to create a flow to run it. I can't change the definition of trigger_dbt_cli_command, since the package itself defines it as a task. I think my problem might be related to the task being async, it seems to finish the execution instantly and the returned type is "None". As long as I'm not using fn(), Prefect will patiently wait for a result, but then I'm forced to call the task from a flow. Edit: If anyone thinks I'm going about this the wrong way, I'd be happy to hear that too. Maybe I just need to settle on always calling trigger_dbt_cli_command from a flow. 🙂
✅ 1
k

Khuyen Tran

08/05/2022, 3:27 PM
You can’t run a task inside a task. However, you can run a flow inside a flow, so you might want to replace your parent task with a flow instead
m

Mason Menges

08/05/2022, 3:28 PM
Hey @Oscar Björhn Khuyens 100% correct I think if this is really the pattern you want create a subflow would make sense. that said what's the reason behind not wanting to call the trigger_dbt task from within the flow?
o

Oscar Björhn

08/05/2022, 4:38 PM
Thanks for both of your replies! The reason I wanted to do this is that I had a flow that might be called something like "run dbt" or "run dbt tests" that called another util task I created called run_dbt that sets up all the connection parameters, injects variable in the statement to run and so on before finally calling trigger_dbt_cli_command with the constructed statement. I didn't see a purpose in "run dbt" showing up as its own flow. I created a workaround I'm happy with anyway, I converted run dbt to a function that returned a finished config dictionary instead, that I can easily pass to the trigger_dbt_cli_command task directly from the flow. Just a minor change in how things are set up. I don't mind changing things around for Prefect 2, the new solution isn't worse or better. I just wanted to make sure I wasn't missing something obvious and now I know. 🎉
View count: 9