https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Alex Furrier

    07/02/2021, 10:03 PM
    I'm trying to run a task in Prefect server that takes a Pandas DataFrame as input and outputs a python list. The server utilizes a Dask backend and as such can be fairly particular about serialization. Previously for tasks involving DataFrames I've been serializing them using
    prefect.engine.serializers.PandasSerializer
    and passing that to the task as a local result. Like so:
    @task(result=LocalResult(serializer=PandasSerializer(
        'csv', serialize_kwargs={'index': False})))
    def my_df_task(df: DataFrame) -> DataFrame:
    However using this method for the previously described task (input DF, output List) fails as it's trying to use the PandasSerializer to serialize the list. What's the most sensible way around this? My hacky workaround is to create a serializer that uses
    PandasSerializer
    methods for deserializing the Dataframe and
    PickleSerializer
    methods for serializing the list. I feel like there has to be a smarter way to do so.
    k
    • 2
    • 2
  • b

    Ben Muller

    07/02/2021, 10:20 PM
    Hey team, I have a little bit of confusion with the new release to
    15.0
    in the API keys section. I currently authenticate with my agents with
    prefect auth login -t "foo"
    but in the UI I definitely have used values that are
    API keys
    . Does that mean I just need to update the prefect version on my agent and change the
    -t
    argument to
    --key
    ?
    k
    • 2
    • 1
  • b

    Ben Muller

    07/03/2021, 2:35 AM
    Hey Prefect community, I am trying to use the
    great expectations
    task that you have in built. I am getting an exception when I don't provide the
    validation_operators:
      action_list_operator:
    in my
    great_expectations.yml
    , but then after providing it I receive a warning:
    WARNING  great_expectations.data_context.types.base:base.py:1016 You appear to be using a legacy capability with the latest config version (3.0).
        Your data context with this configuration version uses validation_operators, which are being deprecated.  Please update your configuration to be compatible with the version number 3.
    Is this something to do with the version of
    ge
    that prefect are using in the task?
    k
    • 2
    • 3
  • o

    Omar Sultan

    07/03/2021, 3:39 AM
    Hi Guys, I am trying to run an agent on a remote machine, and any flow that I attempt to run gives the error ModuleNotFoundError("No module named 'C'") Note that this is even the case for example flows from Prefect's website and that the flows run normally when agent is run from same machine as prefect backend server or when fow is run as flow.run() locally in python
    k
    • 2
    • 10
  • b

    Ben Collier

    07/05/2021, 10:08 AM
    Morning all - we’re seeing an odd message this morning, telling us that our flow is scheduled, but behind schedule. Normally it just kicks off. What could be causing this issue?
    j
    • 2
    • 4
  • e

    Evgenii

    07/05/2021, 3:55 PM
    Hello, I found that Prefect has the Task to run docker images, but can Prefect push my project (code, scripts) into the running image?
    k
    • 2
    • 3
  • b

    Ben Muller

    07/05/2021, 10:37 PM
    Hey Prefect, I have set up an automation for when a flow fails to alert in Slack, The default message is good, but I want it to specify the project too, I cant seem to find a template variable for that here. Is there one ?
    k
    m
    • 3
    • 3
  • w

    wiretrack

    07/05/2021, 11:04 PM
    Hey guys, having a hard time registering the
    kubernetes agent
    keep getting
    [Errno -2] Name or service not known
    . Tried every variation for apollo: `prefect-apollo-service`(my svc name),
    <http://prefect-apollo-service:4200/graphql>
    , the
    https
    address, can’t make it to work. any ideas on what I should be doing to get the agent to talk to apollo? If I remove the
    http
    I get a different error:
    1 No connection adapters were found for 'prefect-apollo-service:4200/graphql'
    m
    • 2
    • 3
  • l

    Laura Vaida

    07/06/2021, 7:48 AM
    hi everybody, 'm trying to write a csv file to the GCS. Does anybody know how to do that as result? I just can define the result object but don't know how to write to that.. thanks!
    @task(log_stdout=True)
    def write_order_data (dataframe):
        current_date = dt.today().strftime("%Y_%m_%d")
        #GCS_Result = GCSResult(bucket='uwg-mail', location = 'orders_import_sf' + '_' + current_date + '.csv')
        dataframe.to_csv('<gs://uwg-mail/orders_import_sf.csv>', dataframe, header=True)
    ✅ 1
    m
    k
    • 3
    • 6
  • g

    Gabriel Santos

    07/06/2021, 4:20 PM
    Hi guys, sorry for my english. I am trying to setup a local agent in heroku and everything runs well, but when I try to register a task by heroku bash I only receive
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/app/'
    Did i do something wrong?
    k
    • 2
    • 3
  • j

    Joseph Loss

    07/06/2021, 5:34 PM
    does anyone have experience running a docker image from a private registry (Amazon ECR) and the script stored on AWS CodeCommit?
    k
    • 2
    • 34
  • s

    Stéphan Taljaard

    07/06/2021, 5:47 PM
    Hi. Airflow and Dagster has "sensors" that can be used to kick off DAGs. What is the Prefect way of doing this? Run with a high-frequency schedule (e.g. every minute) and short-circuit the flow run using
    case
    ?
    m
    • 2
    • 2
  • m

    Madison Schott

    07/06/2021, 7:53 PM
    Hi all, I keep getting this error after specifying the profiles_dir in the DbtShellTask- it is the directory where the file is located so I'm not sure why it's not able to find it
    fatal: Not a dbt project (or any of the parent directories). Missing dbt_project.yml file
    k
    c
    • 3
    • 25
  • w

    wiretrack

    07/06/2021, 8:25 PM
    Hey all, I’m digging a bit deeper on Prefect’s architecture, but I couldn’t quite understand the role of the three graphql services (Hasura, GraphQL and Apollo). I though Hasura already provided an endpoint and the platform to build mutations on top of it. So why would the GraphQL be needed? And if it’s to provide the mutations, the GraphQL already provides an endpoint as well, why is Apollo needed? Probably just a dumb question, but my only experience with GraphQL was plugging Hasura on top of Postgres, so I’m a bit lost on why the server was structured that way. Any insights?
    k
    m
    • 3
    • 3
  • s

    Suchindra

    07/06/2021, 8:26 PM
    Hi Prefect team, We are considering adopting Prefect more seriously for our business. Can I get a sales contact to start a conversation?
    k
    k
    • 3
    • 4
  • z

    Zach Schumacher

    07/06/2021, 8:27 PM
    are there any downsides to using the dask executor? I’m curious why the
    DaskLocalExecutor
    isn’t the default executor, if dask is a dep of prefect anyways.
    k
    w
    • 3
    • 7
  • s

    Samuel Kohlleffel

    07/06/2021, 9:07 PM
    When using the azure task
    BlobStorageUpload
    is there anyway to set
    overwrite=True
    so the files in the container are overwritten with the new uploaded files when my flow runs?
    k
    • 2
    • 3
  • p

    Peyton Murray

    07/06/2021, 11:45 PM
    I want to define a task that stores results before running it inside a flow, like this:
    import prefect as pf
    from prefect.engine.results import LocalResult
    
    @pf.task(checkpoint=True, result=LocalResult(dir=path_to_result)
    def my_task(a, b, c):
        return do_stuff(a, b, c)
    
    with pf.Flow('my flow') as flow:
        my_task(1, 2, 'foo')  # <--- I want to be able to specify path_to_result here
    
    flow.run()
    What's the right way to structure this to specify
    path_to_result
    at the indicated location?
    k
    • 2
    • 4
  • m

    matta

    07/07/2021, 12:54 AM
    With the new update, my GitHub Action stopped working. I made a Key, put it in my Repo's secrets, and now I get this:
    FileNotFoundError: [Errno 2] No such file or directory: '/home/runner/.prefect/auth.toml'
    g
    r
    k
    • 4
    • 8
  • b

    Brad I

    07/07/2021, 1:12 AM
    I also had an issue with the k8s agent and the new API key. From the new docs it seemed like you should just remove or blank out the old AUTH_TOKEN env var, but when I created a new service account key and deployed it, I get a message like
    failed to authenticate, missing token
    . It works if I set the key in both variables, is this expected?
    env:
    - name: PREFECT__CLOUD__AGENT__AUTH_TOKEN
      value: XXXXXXXX
    - name: PREFECT__CLOUD__API
      value: <https://api.prefect.io>
    - name: PREFECT__BACKEND
      value: cloud
    - name: PREFECT__CLOUD__API_KEY
      value: XXXXXXXX
    - name: PREFECT__CLOUD__TENANT_ID
      value: TTTTTTTT
    image: prefecthq/prefect:0.15.0-python3.7
    k
    m
    t
    • 4
    • 12
  • b

    Ben Muller

    07/07/2021, 2:58 AM
    Hey Prefect, trying to find info on how to set my timezone, couldnt find anything here. Is there a way to make my Cron clock that fires at 1pm everyday ( be my 1pm )
    Australia/Sydney
    time?
    n
    • 2
    • 6
  • s

    Sean True

    07/07/2021, 8:31 AM
    Hi, I'm currently running a flow with a task that is not critical, and often fails. I want to use a control flow task to execute the subsequent tasks with different inputs depending on whether the previous task failed. I could wrap the whole task in a try catch block and return a boolean success value, but I also want the task failure to appear in the prefect UI, along with the error message. Is there any way to branch based on the state of a task, as opposed to the value of the result?
    ✅ 1
    s
    • 2
    • 2
  • k

    Krzysztof Nawara

    07/07/2021, 10:52 AM
    Hi, quick question about task slugs. Context: I'm executing my flow and it fails. I have slugs of the tasks that failed (from flow.slugs[<task>]) and based on those I'd like to retrieve a task so that I can investigate it's dependencies using flow.upstream_tasks() However when I look at flow.tasks, for each task the slug is null and flow.get_tasks(slug=X) doesn't work. I could do this by reverse-mapping flows.slugs which seems to be populated, but it feels like a hack, so I wanted to ensure I'm not missing a simpler way first. Is that expected behaviour or I am missing something?
    k
    • 2
    • 13
  • b

    Ben Collier

    07/07/2021, 10:58 AM
    Hi all - I’ve just noticed that some tasks set to “manual_only” aren’t respecting the tag and just running. Can anyone suggest why this might be?
    k
    • 2
    • 19
  • n

    nib

    07/07/2021, 12:14 PM
    Hi all. I have a state_handler for flow where I’m trying to extract information about which of the tasks failed (
    new_state.result
    ) and error details. But it’s empty and I can get only “Some reference tasks failed.” from
    new_state
    . Is it possible to extract this kind of details?
    k
    r
    • 3
    • 5
  • m

    Madison Schott

    07/07/2021, 2:30 PM
    Hi all, I am trying to create a flow where one task depends on the completion of another task within the flow. However, no data is passed from one task to the next. Following the documentation, this is what I have so far. How do I pass the values I want to pass to the
    FiveTranSyncTask()?
    Also is it ok if I just have the
    dbt_task
    defined before this with the parameters needed?
    user_profile_w_campaign = Flow("User Profile with Campaign")
    user_profile_w_campaign.set_dependencies(
        task=dbt_task,
        upstream_tasks=[FivetranSyncTask()]
        )
    user_profile_w_campaign.run()
    k
    • 2
    • 12
  • m

    Mike Wochner

    07/07/2021, 3:36 PM
    Is it possible to disable automatic input caching? The problem with the following flow setup is that the today_date variable seems to be chached from previous days, which is not what I intend. Is it possible to avoid this input caching or a better way to structure this. The reason the caputuring of the date happens at the beginning of the code, is that the entire flow can run for longer times and enter the next day.
    with Flow('Example') as flow:
    
        today_date = datetime.date.today().strftime("%Y-%m-%d")
    
        data = extract_data(security_list, today_date)
        load_data(data)
    
        ...
    
        
        more_data = extract_more_data(security_list)
        load_more_data(more_data, today_date)
    k
    • 2
    • 7
  • a

    Amit

    07/07/2021, 5:12 PM
    Hi everyone, is there something wrong with the dashboard, it looks empty? (On https://cloud.prefect.io/)
    n
    • 2
    • 8
  • a

    Amit

    07/07/2021, 5:17 PM
    Also, is this a recent requirement to deploy an agent with a label because the flows would always be deployed with a label and an agent without label, would never pick it?
    k
    • 2
    • 1
  • a

    ale

    07/07/2021, 5:33 PM
    Hey folks 😒imple_smile: We are planning to upgrade our Prefect deployment from 0.13.13 to the latest version. We have 40+ flows, which are built using Prefect Core 0.13.13. Flows uses Docker storage and are stored on a private ECR repo. In productionthey run using the (deprecated) Prefect Fargate Agent, while for local development we use Prefect Docker Agent. Ideally, we’d like to upgrade Prefect Server and Prefect Agents to 0.15, and keep flows at Prefect Core 0.13.13 since we can’t upgrade them massively. I made some testing locally with Prefect Server and Prefect Docker Agent 0.15 and everything seems to work fine, but I hadn’t time to test extensively. Any suggestions on how to handle this upgrade process smoothly? 😊 Thanks!
    k
    m
    • 3
    • 35
Powered by Linen
Title
a

ale

07/07/2021, 5:33 PM
Hey folks 😒imple_smile: We are planning to upgrade our Prefect deployment from 0.13.13 to the latest version. We have 40+ flows, which are built using Prefect Core 0.13.13. Flows uses Docker storage and are stored on a private ECR repo. In productionthey run using the (deprecated) Prefect Fargate Agent, while for local development we use Prefect Docker Agent. Ideally, we’d like to upgrade Prefect Server and Prefect Agents to 0.15, and keep flows at Prefect Core 0.13.13 since we can’t upgrade them massively. I made some testing locally with Prefect Server and Prefect Docker Agent 0.15 and everything seems to work fine, but I hadn’t time to test extensively. Any suggestions on how to handle this upgrade process smoothly? 😊 Thanks!
k

Kevin Kho

07/07/2021, 7:11 PM
See this about 0.15.0 at the moment for server. I think upgrading server and agents to 0.15, while keeping flows at 0.13.13 may break. Normally the minor version increment will contain breaking changes. If it works for you, that’s great! It does give weird errors though when it doesn’t work. I think you’d have to move from FargateAgent (maybe to ECSAgent?) when it gets deprecated though.
a

ale

07/07/2021, 7:42 PM
So maybe we better upgrade server and agent to a less recent version, say 0.14.x in order to minimize risks while keeping flows at 0.13.13?
k

Kevin Kho

07/07/2021, 7:43 PM
That will be fixed next release and 0.15.0 might be worth the upgrade as it has some really convenient stuff. If you need to migrate now, then yeah I guess 0.14.21
a

ale

07/07/2021, 7:58 PM
I think we are going to upgrade first to 0.14.21, then upgrade flows and then upgrade to 0.15. Does this make sense?
k

Kevin Kho

07/07/2021, 7:58 PM
Yep!
a

ale

07/07/2021, 7:59 PM
Thanks Kevin! 🙌
Hey @Kevin Kho 😒imple_smile: Another question just came up to my mind 😅 With Fargate Agent we used to specify the
containerDefinitions_environment
so that we can pass environment variables to the task that runs the flow. Is this still possible with the ECS Agent?
k

Kevin Kho

07/08/2021, 2:50 PM
Let me check
a

ale

07/08/2021, 2:52 PM
👍
It was super helpful, because we declared the Agent task definition and the task environment using YAML in a CFN template
k

Kevin Kho

07/08/2021, 2:53 PM
I believe using the
env
of ECSRun will pass through
But you can also do it by supplying the task definition to ECSRun and it will be used when calling boto3's
register_task_definition
.
a

ale

07/08/2021, 2:56 PM
mmmh…but in this way I need to know env var values during flow development/registration, while with
containerDefinitions_environment
I can define env var at the agent level and then they are passed to the flow…
Basically I need to understand how to pass env var values from the Agent to the flow that Agent is going to start
Let me give an example of the CFN we use to deploy the Fargate Agent
rAgentFargateTaskDefinition:
    Type: 'AWS::ECS::TaskDefinition'
    Metadata:
      cfn-lint:
        config:
          ignore_checks:
          - E1029
    DependsOn:
      - rFlowTaskRolePolicy
      - rAgentTaskRolePolicy
    Properties:
      TaskRoleArn: !Ref 'rAgentTaskRole'
      ExecutionRoleArn: !GetAtt 'rAgentExecutionRole.Arn'
      Family: !Sub '${pEnvironment}-prefect-fargate-agent'
      ContainerDefinitions:
        - Name: 'agent'
          Essential: true
          Command: ['prefect', 'agent', 'start', 'fargate', 'enable_task_revisions=true']
          Image: !Sub
            - 'prefecthq/prefect:${version}'
            - version: !If [ cIsPrefectServerVersionLatest, 'all_extras', !Sub 'all_extras-${pPrefectServerVersion}' ]
          MemoryReservation: 128
          Environment:
            - Name: PREFECT__CLOUD__API
              Value: !Sub '<masked value>'
            - Name: PREFECT__BACKEND
              Value: 'server'
            - Name: PREFECT__CLOUD__AGENT__LABELS
              Value: '["heavy"]'
            - Name: REGION_NAME
              Value: !Ref AWS::Region
            - Name: executionRoleArn
              Value: !GetAtt 'rFlowsExecutionRole.Arn'
            - Name: taskRoleArn
              Value: !GetAtt 'rFlowsTaskRole.Arn'
            - Name: networkMode
              Value: 'awsvpc'
            - Name: cluster
              Value: !Ref pEcsCluster
            - Name: networkConfiguration
              Value: !Sub
                - '{"awsvpcConfiguration": {"assignPublicIp": "DISABLED", "subnets": ["${subnets}"], "securityGroups": ["${pDWHClientSecurityGroup}"]}}'
                - subnets: !Join ['","', !Ref pFargateSubnetIds]
            - Name: cpu
              Value: '4096'
            - Name: memory
              Value: '8192'
            - Name: containerDefinitions_environment
              Value: !Sub
                - '[{
                      "name": "AWS_DEFAULT_REGION",
                      "value": "${AWS::Region}"
                    }, {
                      "name": "INTERNAL_SERVICES_CONNECTION_DATA_SECRET_ARN",
                      "value": "${rPrefectAgentInternalServicesConnectionDataSecret}"
                    }, {
                      "name": "DWH_JSON_CONNECTION_DATA_SECRET_ARN",
                      "value": "${rPrefectAgentDWHConnectionDataSecret}"
                    }, {
                      "name": "PLATFORM_DB_JSON_CONNECTION_DATA_SECRET_ARN",
                      "value": "${rPrefectAgentPlatformDBConnectionDataSecret}"
                    }, {
                      "name": "ENVIRONMENT",
                      "value": "${pEnvironment}"
                    }, {
                      "name": "DWH_QUERY_ROLE_ARN",
                      "value": "${pSafelakeDWHClusterRoleArn}"
                    }, {
                      "name": "EXTERNAL_SERVICES_SECRETS",
                      "value": "${rPrefectAgentExternalServicesSecret}"
                    }, {
                      "name": "DWH_PLATFORM_POSTGRES_USER_SECRET_ARN",
                      "value": "${pDWHPostgreSQLVersion11UserSecretArn}"
                    }, {
                      "name": "SAFELAKE_BUCKET_INGESTION_NAME",
                      "value": "${SafelakeBucketIngestionName}"
                    }, {
                      "name": "PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL",
                      "value": "${pSlackWebhook}"
                    }, {
                      "name": "ROLLBAR_ENABLED",
                      "value": "true"
                    }, {
                      "name": "SLACK_ENABLED",
                      "value": "true"
                    }, {
                      "name": "PREFECT__SERVER__UI__ENDPOINT",
                      "value": "<masked_value>"
                    }
                   ]'
                - SafelakeBucketIngestionName: !Select [1, !Split [':::', !Ref pDatalakeBucketArn]]
            - Name: containerDefinitions_logConfiguration
              Value: !Sub '{ "logDriver": "awslogs", "options": { "awslogs-group": "${rFlowsLogsGroup}", "awslogs-region": "${AWS::Region}", "awslogs-stream-prefix": "prefect-flow"}}'
          LogConfiguration:
            LogDriver: 'awslogs'
            Options:
              awslogs-group: !Ref 'rAppLogsGroup'
              awslogs-region: !Ref 'AWS::Region'
              awslogs-stream-prefix: 'prefect-agent'
      Tags:
        - Key: 'Name'
          Value: !Sub '${pEnvironment}-prefect-fargate-agent'
        - Key: 'ApplicationVersion'
          Value: !Ref 'pApplicationVersion'
k

Kevin Kho

07/08/2021, 3:01 PM
Values set using the
--env
CLI flag on the agent are also passed through. Is that what you need?
a

ale

07/08/2021, 3:02 PM
Yeah I think it could work! Can we pass values using
Environment
instead of the
--env
CLI flag?
k

Kevin Kho

07/08/2021, 3:06 PM
You mean setting it on the agent right? I’m honestly not sure, but I’d like to think you can
I’ll ask the team and get back
a

ale

07/08/2021, 3:08 PM
Yeah setting them in the Agent Task Defintion through the
Environment
parameter. Otherwise we’ll have to provide all values in the
command
parameter where we start the agent (passing values as you said using
--env
CLI flag)
m

Mariia Kerimova

07/09/2021, 2:08 PM
I believe you can use env_vars on ECSAgent, and here you can see the precedence of populating env vars
a

ale

07/09/2021, 4:27 PM
Hey @Mariia Kerimova 😒imple_smile: Thanks! I understand that I can use
--env
CLI flag to pass my env vars. I just need to understand if using the
Environment
in the task definition of the agent would work as well 😒imple_smile:
m

Mariia Kerimova

07/09/2021, 10:10 PM
Just setting env var on agent task definition (without setting
--env
CLI flag on agent or using
ECSAgent(env_vars=..)
or
ECSRun(env=..)
) will not pass the env var to the flow. I'll ask the team if we can have implicit env vars discovery
a

ale

07/10/2021, 7:52 AM
Sorry but it's a bit confusing 😅 What I'd like to do is to define env vars at the agent level, so that these variables are available to all flows. Ideally, i would do that by setting the variables in the Environment parameter or using --env CLI param in the Command parameter in the ECS task definition. Hopefully, these vars are passed along to the flow run without any special settings in ECSRun. Woukd this work?
Any suggestions? 😊
m

Mariia Kerimova

07/12/2021, 7:43 PM
Hello @ale! Sorry for delay with reply! Yes, if you'll set
--env
CLI param in the Command parameter in the ECS task definition, the environment variables will be passed to the flow 🙂
what I meant earlier is if you will just add env variables in Environment section of agent's task definition in CFN template without using
--env
or
ECSRun(env=..)
or
ECSAgent(env_var)
, the variables will not be passed to the flow runs, but if you'll specify variables using
--env
it should work 👍 I hope it makes sense, if not, let me know and I'll try to give you an example.
a

ale

07/12/2021, 7:56 PM
Hey @Mariia Kerimova 😊 An example on how to reference in
--env
the values defined in the Environment would be super appreciated!
m

Mariia Kerimova

07/12/2021, 8:48 PM
Sure, to pass with
--env
will look like this in the console:
Or in your template set:
Properties:
      TaskRoleArn: !Ref 'rAgentTaskRole'
      ExecutionRoleArn: !GetAtt 'rAgentExecutionRole.Arn'
      Family: !Sub '${pEnvironment}-prefect-fargate-agent'
      ContainerDefinitions:
        - Name: 'agent'
          Essential: true
          Command: ['prefect', 'agent', 'ecs', 'start', 'TEST_ECS_ENV=value_of_env_var']
then if you enabled CloudWatch, check the agent’s log, to make sure that env vars are set
and I tested it with flow:
from prefect.storage import Docker
from prefect.executors import LocalExecutor
from prefect.run_configs import ECSRun
from prefect import task, Flow
import prefect
import os

STORAGE = Docker(
    registry_url="mariiaprefect",
    image_name="envvars-testflow",
)

RUN_CONFIG = ECSRun(run_task_kwargs={"cluster": "default-cluster"}, labels=["masha", "envtoken"])
EXECUTOR = LocalExecutor()

@task
def print_me():
    logger = prefect.context.get("logger")
    env_value = os.environ['TEST_ECS_ENV']
    <http://logger.info|logger.info>(f"Got the env var {env_value}")


with Flow(
    "Ale", storage=STORAGE, run_config=RUN_CONFIG
) as flow:
    print_me()
a

ale

07/12/2021, 9:06 PM
This is great! Thanks @Mariia Kerimova 😊 I assume we can also reference other CFN resources when assigning a value to TEST_ECS_ENV. For example, I should be able to set
TEST_ECS_ENV=${pEnvironment}
where pEnvironment is a parameter defined in teh CFN template. Does this make sense?
m

Mariia Kerimova

07/12/2021, 9:08 PM
I’ve never used CFN, but I think it should work!
a

ale

07/12/2021, 9:16 PM
Thanks 😊 Much appreciated!
👍 1
View count: 3