https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • b

    Bo

    02/14/2022, 5:54 PM
    Hello, I am trying to run an ECS task while defining the task definition as a dictionary (https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows/s3_ecs_run_task_definition_as_dict.py), but I keep receiving this error:
    An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Container.image should not be null or empty
    I'm have defined the task as a yaml file (e.g. https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows/s3_ecs_run_custom_task_definition.py) and that works fine, so I am not sure what I am doing wrong. Thanks!
    k
    • 2
    • 15
  • b

    Ben Welsh

    02/14/2022, 8:37 PM
    I have a private Python package bundled up and stored in Google Artifact Registry (as opposed to an open-source package on PyPI). I'd like to include it in my flow, which uses a Docker storage instance in production. I know that I can use the
    python_dependencies
    kwarg to the class to include open-source packages. But how do I get a private package from Google Artifact Registry included as well? Is there an established pattern for this?
    🙌 1
    k
    a
    • 3
    • 6
  • d

    David Serédi

    02/14/2022, 8:57 PM
    Hey. I have started playing with prefect Orion, and I use a different host (I need to access the UI from another machine). What I noticed that after using the prefect Orion start command(prefect orion start --host MY_IP ) is that everything works fine(I can deploy flows and they run on schedule) except that the UI shows no flows nor any deployments. Am I doing something wrong? Thanks a lot
    k
    • 2
    • 2
  • j

    jack

    02/15/2022, 12:25 AM
    Anyone had success running an ECSTask on AWS Workspaces? There appears to be a bug where docker's default "bridge" networking, when run on AWS Workspaces, does not allow any network traffic to/from the outside world. Which means that when your Dockerfile gets to a line like
    RUN pip install -r requirements.txt
    that requires network it just hangs.
    k
    • 2
    • 12
  • в

    Владислав Богучаров

    02/15/2022, 1:31 PM
    Hello everyone! I use aws for my work. On my MacOS laptop .aws contains configuration files that are responsible for accessing my work cloud. I want to test prefect with another aws account but I'm afraid something might go wrong. As far as I understand, Prefect uses boto3, and boto3 will take configs from the default path. How to distinguish between a working aws profile and a home one, and what is the most important thing how to say about this to Prefect?
    a
    r
    +2
    • 5
    • 38
  • a

    Alvaro Durán Tovar

    02/15/2022, 7:52 PM
    When using k8s
    RunNamespacedJob
    is possible to get the logs from the job and have them on prefect ui? I'm setting
    log_level
    but that isn't working
    k
    • 2
    • 4
  • l

    Lana Dann

    02/15/2022, 8:18 PM
    hi! i have a question on testing flows. if i have a flow that’s scheduled to run once a week, am i able to still run it locally using
    flow.run()
    ? i’m trying to create an integration test now and it’s just stalling so i wonder if it’s because the schedule that’s attached to it is waiting until the actual start time.
    a
    k
    • 3
    • 6
  • s

    Sharath Chandra

    02/16/2022, 8:04 AM
    Hi, I am using prefect to submit spark job. The spark is managed via k8s(https://spark.apache.org/docs/latest/running-on-kubernetes.html) I have created a subclass of ShellTask to invoke the
    spark-submit
    , The issue I am facing is that in case of errors in the spark-job, the prefect task still shows it as a success. I can see the pod status on k8s as failed with errors in the log. How can we propagate the errors from spark jobs to prefect task? The
    Task
    is defined
    class K8sSparkSubmitTask(ShellTask):
    
      def __init__(
        self,
        command: str = None,
        master_uri: str = None,
        deploy_mode: str = None,
        job_name: str = None,
        conf_kwargs: dict = None,
        env: dict = None,
        helper_script: str = None,
        shell: str = "bash",
        return_all: bool = False,
        log_stderr: bool = False,
        **kwargs: Any,
      )
        super().__init__(
             **kwargs,
             command=command,
             env=env,
             helper_script=helper_script,
             shell=shell,
             return_all=return_all,
             log_stderr=log_stderr,
         )
    
     @defaults_from_attrs("job_name", "conf_kwargs")
     def run(
          self,
          job_name: str = None,
          job_uri: str = None,
          job_args: List[str] = None,
          conf_kwargs: dict = None,
      ) -> str:
    
      command = self._build_command(
            master_uri=self.master_uri,
            deploy_mode=self.deploy_mode,
            job_name=job_name,
            job_uri=job_uri,
            job_args=job_args,
            conf_kwargs=conf_kwargs,
        )
        print(f"The spark-submit command is {command}")
        return super(K8sSparkSubmitTask, self).run(command=command)
    The spark-job is
    def main(args):
      spark_session, spark_logger, config_dict = init_spark()
      try:
        # logic here
      except Exception as err:
          spark_logger.error(err)
          raise
      finally:
        if spark_session is not None:
            spark_session.stop()
    d
    • 2
    • 6
  • d

    Dotan Asselmann

    02/16/2022, 8:20 AM
    Hey is there a standard way (maybe by configuration) to manage the data retention in the postgres db? Our pvc is almost full and I realised nothing ever deleted
    a
    • 2
    • 3
  • j

    Jovan Sakovic

    02/16/2022, 2:34 PM
    Hello 👋 I have an ETL Flow, using a LocalDaskExecutor, that maps the API extraction function across a list of thousands of parameters to query the API endpoint with. It’s running open source on a VM instance. It’s all running okay, extracts and loads into Snowflake, until the error in the thread reply occurs once. Then it just spirals down from there, slowly erroring out more frequently.
    k
    • 2
    • 6
  • m

    Michał

    02/16/2022, 2:42 PM
    Hey, what ports should be open for working on 2 VPS (1st is server and ui, on 2nd is a local agent). Right now I have port 4200 and 8080 open, but when I start prefect server without cloud. I get: (on second screenshot i replaced my serwer IP with localhost)
    a
    • 2
    • 2
  • j

    jack

    02/16/2022, 5:10 PM
    What are the pieces required to get ECS logs into cloudwatch? The reason we are interested is because sometimes an ECS task fails to initialize and so there are no prefect logs to inspect.
    a
    k
    k
    • 4
    • 18
  • l

    Liam England

    02/16/2022, 5:58 PM
    Hi folks, Wondering if there's a way to set a limit the amount of flow runs that can be spun up by a docker agent at a time while maintaining a queue for incoming run requests? ie. 50 flow runs are submitted, spin up containers for 10 runs max in parallel
    a
    • 2
    • 2
  • b

    Bruno Nunes

    02/17/2022, 10:50 AM
    Hi guys, I'm trying to create a task template in prefect passing a list of parameters. To ensure that my task is generic it has only one argument which is a python list of dict. The idea is to get my parameters, process them to define a python list and pass it to my tasks. The problem that I'm seeing is that the Parameter is creating a task for each manipulation of the arguments. Thanks
    a
    • 2
    • 6
  • p

    Pedro Martins

    02/17/2022, 6:23 PM
    Hey all! Does anyone know how to check if a certain version of the prefect-server helm chart is compatible with a newer version of Kubernetes? For instance, I want to make sure that prefect-server-2021.09.02 (and others helm charts too) is compatible with Kubernetes v1.22 Thank you!
    a
    m
    • 3
    • 2
  • d

    Daniel

    02/17/2022, 9:49 PM
    Hello all, I'm trying to create Prefect Agents from the Azure Marketplace. It asks for an API key, but the one I generated from Prefect Cloud has more characters than the 22 maximum that Azure is asking for. Am I generating the wrong key? I'm doing it from Account Settings > API Keys.
    a
    • 2
    • 3
  • m

    Mariusz Olszewski

    02/18/2022, 10:43 AM
    Is there a possibility to react with perfect flow for sns/sqs event? how to do it?
    a
    • 2
    • 4
  • j

    jcozar

    02/20/2022, 1:49 PM
    Hi everyone! I am experiencing something inusual in prefect cloud. I have a flow scheduled everyday at 5:00 am (UTC). I also have an automation (SLA) which any flow run that does not start in 1h is cancelled (and notify to slack). Since 3 days ago I am receiving a notification from that automation, but flow runs were correctly executed at 5:00 am (UTC) every day. For example, last notification said that flow run
    51ec0178-bd12-43c4-8b00-fa4ae61300ef
    was cancelled because of that automation. But in the dashboard that flow run does not exists, and a flow run was correctly executed at expected time. Do you know what might be happening? Thank you in advance!
    a
    • 2
    • 22
  • s

    Slava Shor

    02/20/2022, 4:19 PM
    Hello everyone! I am Slava from Emporus. Prefect documentation has some examples of configurations for AWS ECS but, it doesn’t give a comprehensive recipe on how to run a workflow in ECS using the Fargate cluster (or maybe I need a Google Premium account to find it 😊) After some trial and error and by looking at other tutorials online we seem to find an almost working solution. We are trying to use the slef-hosted Perfect server on EC2 in AWS using the “ECSRun” and “DaskExecutor” configuration inside the flow definition script to run the workflow in ECS using Fargate cluster. We are also, building a custom Docker image using Python 3.10. At first, we had an issue with Dask compression but, sorted it out by providing an environment variable
    DASK_DISTRIBUTED__COMM__COMPRESSION=zlib
    and now we getting a puzzling exception: We didn’t use Conda explicitly at any stage. And yet, we see something weird. All of the stack trace is from Python 3.10 except the last line, which is from Python 3.8.
    k
    a
    • 3
    • 44
  • s

    Stéphan Taljaard

    02/21/2022, 4:45 AM
    Hi. The current Orion alpha is explicitly not recommended for production. How much closer to production-ready will the upcoming beta release be?
    k
    • 2
    • 3
  • m

    Michał

    02/21/2022, 6:56 AM
    Hey, how register flow which have multiple files. First main.py which has all flow regarding code, and second ultity which has some ultity functions. When i do flow run I have successfull run, but after registering flow and running from server I get module not found "ultity"
    a
    • 2
    • 1
  • m

    Mathijs Carlu

    02/21/2022, 3:04 PM
    Hi, I'm playing around with the latest version of prefect orion (2.0a12), which atm runs on a local kubernetes cluster. I would like to log from inside a task, but currently don't manage to do so. Logging from a flow works however. How can I fix this? I'm using
    get_run_logger().info('message')
    , as the
    print()
    function also does not seem to produce any logs.
    a
    m
    • 3
    • 8
  • m

    madhav

    02/21/2022, 4:56 PM
    Hi all - we are testing out production deployment strategies for our prefect agents. We have read: [1][2][3]. Currently we are trying to benchmark, test, and determine costs for deploying to ECS + Fargate, GKE with autopilot, and single large VM with a local executor. Does anybody have rough comparisons by any chance? Cost of course will be sensitive to our specific workflow - but was curious if anyone has a good idea of how the costs approximately compare? [1] https://gist.github.com/palewire/072513a9940478370697323c0d15c6ec, [2] https://medium.com/the-prefect-blog/the-simple-guide-to-productionizing-data-workflows-with-docker-31a5aae67c0a, [3} https://rdrn.me/scaling-out-prefect/.
    a
    • 2
    • 5
  • в

    Владислав Богучаров

    02/21/2022, 7:39 PM
    Hello! I'm looking for a mechanism that would allow me not to restart a successfully completed task. For example, my result is stored in s3 and I don't need to overwrite it. Something like Luigi, where before executing the task checks whether such an output already exists or not. I understand that there is a cache_for and a cache_validator, but as I understand it, they are based on time, and I need a constantly reliable mechanism. Or should I implement this for myself?
    a
    • 2
    • 2
  • s

    Sen

    02/22/2022, 2:07 PM
    Hello everyone! This is Sen from SouthAfrica. I have been struggling with a problem for sometime now. Googled a bit and can't get any directions to solve this issue. So I am running my own prefect server in an Azure VM and I am running an agent in my local machine. The Prefect version I am running is 0.13.18 everywhere. The prefect agent was started in my machine using the below command :
    prefect agent local start -l 'On_Prem_MapTest' -a "<http://MY-SERVER-IP:4200>"
    Please find the code for the flow below:
    # Basic Imports
    import psutil
    import os
    import requests
    
    # Extracting the Prefect Server URL
    os.environ["PREFECT__SERVER__ENDPOINT"] = "<http://MY-SERVER-IP:4200/graphql>"
    
    import prefect
    from prefect import Flow, Parameter, task
    from prefect.core.task import Task
    from prefect.engine import signals
    from prefect.engine.state import State
    from prefect.environments import LocalEnvironment
    from prefect.environments.storage import Docker, Azure
    from prefect.engine.executors import LocalDaskExecutor, DaskExecutor
    
    
    @task
    def create_url_list():
        """
        Given the main page html, creates a list of episode URLs
        """
    
        url_ids = [21.10, 21.04, 20.10, 20.04, 19.10, 19.04, 18.10, 18.04, 17.10, 17.04, 16.10, 16.04, 
                   15.10, 15.04, 14.10, 14.04, 13.10, 13.04, 12.10, 12.04, 11.10, 11.04, 10.10, 10.04, 09.10, 09.04, 
                   08.10, 08.04, 07.10, 07.04, 06.10, 06.06, 05.10, 05.04, 04.10, 06.10, 06.06, 05.10, 05.04, 04.10]
        
        urls = []
        for url_id in url_ids:
            urls.append('<http://old-releases.ubuntu.com/releases/>' + str(url_id))
    
        return urls
    
    
    @task
    def retrieve_url(url):
        print(url)
        html = requests.get(url)
        if html.ok:
            return str(len(html.content))
        else:
            return None
    
    
    def main():
        """Main Function"""
        
        with Flow(
            "On_Prem_MapTest",
        ) as flow:
    
            get_urls = create_url_list()
            url_results = retrieve_url.map(get_urls)
    
        # Registering the Flow as a Docker
        flow.environment = LocalEnvironment()
        # flow.storage = Local()
        
        flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=16)
        flow.register("SampleFlows", labels=["On_Prem_MapTest"])
        
        # The code below when uncommented works fine 
        # final_state = flow.run()
        # final_state_2 = final_state.result[url_results]
        # print('\n'.join([f'{s.result}: {s}' for s in final_state_2.map_states[:5]]))
    
    if __name__ == "__main__":
        main()
    The problem is when I try to run this using flow from the agent by registering it and then using the PrefectUI to run the task, then it fails with the below message:
    Failed to retrieve task state with error: ClientError('400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql>\n\nThe following error messages were provided by the GraphQL server:\n\n    GRAPHQL_VALIDATION_FAILED: Cannot query field "get_or_create_task_run_info" on\n        type "Mutation". Did you mean "get_or_create_task_run" or\n        "get_or_create_mapped_task_run_children"?\n\nThe GraphQL query was:\n\n    mutation {\n            get_or_create_task_run_info(input: { flow_run_id: "9281d541-333a-4c1f-ad60-dd2d22edfa9b", task_id: "31ff1abe-1f1c-4409-a56f-6723bb8482f7", map_index: 0 }) {\n                id\n                version\n                serialized_state\n        }\n    }\n\nThe passed variables were:\n\n    null\n')
    Traceback (most recent call last):
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 360, in _send_request
        response.raise_for_status()
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/requests/models.py", line 953, in raise_for_status
        raise HTTPError(http_error_msg, response=self)
    requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql> 
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 193, in initialize_run
        map_index=map_index,
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 1331, in get_task_run_info
        result = self.graphql(mutation)  # type: Any
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 303, in graphql
        retry_on_api_error=retry_on_api_error,
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 219, in post
        retry_on_api_error=retry_on_api_error,
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 445, in _request
        session=session, method=method, url=url, params=params, headers=headers
      File "/home/sen/anaconda3/envs/prefect_py37/lib/python3.7/site-packages/prefect/client/client.py", line 373, in _send_request
        raise ClientError(f"{exc}\n{graphql_msg}") from exc
    prefect.utilities.exceptions.ClientError: 400 Client Error: Bad Request for url: <http://MY-SERVER-IP:4200/graphql> 
    
    The following error messages were provided by the GraphQL server:
    
        GRAPHQL_VALIDATION_FAILED: Cannot query field "get_or_create_task_run_info" on
            type "Mutation". Did you mean "get_or_create_task_run" or
            "get_or_create_mapped_task_run_children"?
    
    The GraphQL query was:
    
        mutation {
                get_or_create_task_run_info(input: { flow_run_id: "9281d541-333a-4c1f-ad60-dd2d22edfa9b", task_id: "31ff1abe-1f1c-4409-a56f-6723bb8482f7", map_index: 0 }) {
                    id
                    version
                    serialized_state
            }
        }
    
    The passed variables were:
    
        null
    But this works when run directly from the machine using the flow.run() command as commented in the below code without using the agent. I am not sure why it happens like this. It will be great if someone can point me in the right direction on how to solve this issue. I am also not sure on how to finally iterate through the results of the method url_results, once all mapped tasks are done. Thanks in advance,
    👋 3
    ✅ 1
    :discourse: 1
    👀 1
    a
    s
    • 3
    • 11
  • в

    Владислав Богучаров

    02/22/2022, 4:43 PM
    Hi! As I get it right and Prefect doesn't provide any redshift helpers for uploading data? So we need to implement upload logic ourselves (but at the same time I find tasks related to bigquery and snowflake). I mean it's not a problem, but want to be sure because didn't find any information about redshift in docs/github/slack
    a
    k
    • 3
    • 7
  • d

    Devin Flake

    02/23/2022, 12:22 AM
    Hey all, I'm getting this error trying to apply the prefect helm chart, I'm using the Ingress Nginx controller and it's v1 not v1beta1, has anyone seen this before?
    helm install --namespace arte-prefect --version 2022.01.25 --values prefect-values.yaml arte-prefect prefecthq/prefect-server
    Error: INSTALLATION FAILED: unable to build kubernetes objects from release manifest: unable to recognize "": no matches for kind "Ingress" in version "extensions/v1beta1"
    m
    • 2
    • 11
  • z

    Zhibin Dai

    02/24/2022, 12:49 AM
    Hi All! My team just started using Prefect and we are facing some growing pains. I have a flow that runs fine locally, but when I register it and run it through a Local Agent, the Agent is able to pick it up, but the flow is failing because it's not reading secrets stored in Prefect Cloud, even though the secrets are there. Please see screenshots. Any help is appreciated, Thanks!
    k
    • 2
    • 25
  • s

    Scarlett King

    02/24/2022, 1:06 PM
    I’m trying to run
    prefect agent kubernetes start —job-template job_template-nonprod.yaml
    to start the agent. We’re currently setting up our Prefect Server on AKS and deploying via helm chart. Where can I store this file so when the agent script is run, it would pick up the file? Also, we have credentials stored on this file so it will also need to it would also need to be stored somewhere safe as well.
    a
    • 2
    • 2
  • b

    Bruno Nunes

    02/24/2022, 1:38 PM
    Hello, I'm trying to find examples of subflows using imperative statements. Do anyone has an example that could share? Thanks Bruno
    a
    • 2
    • 5
Powered by Linen
Title
b

Bruno Nunes

02/24/2022, 1:38 PM
Hello, I'm trying to find examples of subflows using imperative statements. Do anyone has an example that could share? Thanks Bruno
a

Anna Geller

02/24/2022, 1:47 PM
What do you mean by imperative statements?
If you mean the imperative API for task definition, you can check out this thread that discusses this topic more. If you have some custom classes, you can always call them in your tasks. And if you are looking for more subflow/flow-of-flows examples in general, check out this list.
b

Bruno Nunes

02/24/2022, 3:01 PM
I'm trying to do something like: ...
# Global parameters
BASE_DT = Parameter('BASE_DT', default='12312019')
ENTITY_ID = Parameter('ENTITY_ID', default='SASBank_1')
CYCLE_ID = Parameter('CYCLE_ID', default='10000')
FA_ID = Parameter('FA_ID', default='2022.1.1')
FA_PATH = Parameter(
    'FA_PATH', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1')
RUN_INSTANCE = Parameter(
    'RUN_INSTANCE', default='C:\\Development\\Temp\\Prefect_Cirrus_Core\\core\\2022.1.1\\prefect\\run_instance\\prefect-spre-10000-12312019')

# Instantiate the flow
flow_init = Flow("Initialize")
# # Initialize and run the tasks
initialize = RunSpreTask(name='Initialize', log_stdout=True)
flow_init.set_dependencies(task=initialize,
                           upstream_tasks=[],
                           keyword_tasks=dict(BASE_DT=BASE_DT,
                                              CYCLE_ID=CYCLE_ID,
                                              ENTITY_ID=ENTITY_ID,
                                              FA_ID=FA_ID,
                                              FA_PATH=FA_PATH,
                                              RUN_INSTANCE=RUN_INSTANCE,
                                              NODE_CODE='core_node_init.sas',
                                              RUN_OPTION='core_cfg.run_option',
                                              SYSTEM_OPTION='sys_cfg.run_option',
                                              FLOW_OPTION='core_res.flow_option',
                                              )
                           )

flow_data_enrichment = Flow("Data Enrichment")
filter_by_entity = RunSpreTask(name='Filter by Entity', log_stdout=True)
flow_data_enrichment.set_dependencies(task=filter_by_entity,
                                      upstream_tasks=[flow_init],
                                      keyword_tasks=dict(
                                          GROUP_FLG='N',
                                          NODE_CODE='core_node_filter_entity.sas',
                                          ENTITY_IN='core_lnd.entity',
                                          ENTITY_OUT='core_res.entity',
                                      )
                                      )
I'm getting an error because the prefect.context.parameter can't be resolved in my RunSpreTask class.
a

Anna Geller

02/24/2022, 3:04 PM
it’s probably easier to instantiate the task globally and then calling it within your Flow e.g.
also, the parameters shouldn’t be defined globally, but rather within the Flow block. Check out this blog post - it provides many examples https://www.prefect.io/blog/how-to-make-your-data-pipelines-more-dynamic-using-parameters-in-prefect/
View count: 3