Alex Furrier
07/02/2021, 10:03 PMprefect.engine.serializers.PandasSerializer
and passing that to the task as a local result. Like so:
@task(result=LocalResult(serializer=PandasSerializer(
'csv', serialize_kwargs={'index': False})))
def my_df_task(df: DataFrame) -> DataFrame:
However using this method for the previously described task (input DF, output List) fails as it's trying to use the PandasSerializer to serialize the list.
What's the most sensible way around this?
My hacky workaround is to create a serializer that uses PandasSerializer
methods for deserializing the Dataframe and PickleSerializer
methods for serializing the list.
I feel like there has to be a smarter way to do so.Ben Muller
07/02/2021, 10:20 PM15.0
in the API keys section.
I currently authenticate with my agents with prefect auth login -t "foo"
but in the UI I definitely have used values that are API keys
.
Does that mean I just need to update the prefect version on my agent and change the -t
argument to --key
?Ben Muller
07/03/2021, 2:35 AMgreat expectations
task that you have in built.
I am getting an exception when I don't provide the
validation_operators:
action_list_operator:
in my great_expectations.yml
, but then after providing it I receive a warning:
WARNING great_expectations.data_context.types.base:base.py:1016 You appear to be using a legacy capability with the latest config version (3.0).
Your data context with this configuration version uses validation_operators, which are being deprecated. Please update your configuration to be compatible with the version number 3.
Is this something to do with the version of ge
that prefect are using in the task?Omar Sultan
07/03/2021, 3:39 AMBen Collier
07/05/2021, 10:08 AMEvgenii
07/05/2021, 3:55 PMBen Muller
07/05/2021, 10:37 PMwiretrack
07/05/2021, 11:04 PMkubernetes agent
keep getting [Errno -2] Name or service not known
. Tried every variation for apollo: `prefect-apollo-service`(my svc name), <http://prefect-apollo-service:4200/graphql>
, the https
address, can’t make it to work. any ideas on what I should be doing to get the agent to talk to apollo? If I remove the http
I get a different error: 1 No connection adapters were found for 'prefect-apollo-service:4200/graphql'
Laura Vaida
07/06/2021, 7:48 AM@task(log_stdout=True)
def write_order_data (dataframe):
current_date = dt.today().strftime("%Y_%m_%d")
#GCS_Result = GCSResult(bucket='uwg-mail', location = 'orders_import_sf' + '_' + current_date + '.csv')
dataframe.to_csv('<gs://uwg-mail/orders_import_sf.csv>', dataframe, header=True)
Gabriel Santos
07/06/2021, 4:20 PMFailed to load and execute Flow's environment: ModuleNotFoundError("No module named '/app/'
Did i do something wrong?Joseph Loss
07/06/2021, 5:34 PMMadison Schott
07/06/2021, 7:53 PMfatal: Not a dbt project (or any of the parent directories). Missing dbt_project.yml file
wiretrack
07/06/2021, 8:25 PMSuchindra
07/06/2021, 8:26 PMZach Schumacher
07/06/2021, 8:27 PMDaskLocalExecutor
isn’t the default executor, if dask is a dep of prefect anyways.Samuel Kohlleffel
07/06/2021, 9:07 PMBlobStorageUpload
is there anyway to set overwrite=True
so the files in the container are overwritten with the new uploaded files when my flow runs?Peyton Murray
07/06/2021, 11:45 PMimport prefect as pf
from prefect.engine.results import LocalResult
@pf.task(checkpoint=True, result=LocalResult(dir=path_to_result)
def my_task(a, b, c):
return do_stuff(a, b, c)
with pf.Flow('my flow') as flow:
my_task(1, 2, 'foo') # <--- I want to be able to specify path_to_result here
flow.run()
What's the right way to structure this to specify path_to_result
at the indicated location?matta
07/07/2021, 12:54 AMFileNotFoundError: [Errno 2] No such file or directory: '/home/runner/.prefect/auth.toml'
Brad I
07/07/2021, 1:12 AMfailed to authenticate, missing token
. It works if I set the key in both variables, is this expected?
env:
- name: PREFECT__CLOUD__AGENT__AUTH_TOKEN
value: XXXXXXXX
- name: PREFECT__CLOUD__API
value: <https://api.prefect.io>
- name: PREFECT__BACKEND
value: cloud
- name: PREFECT__CLOUD__API_KEY
value: XXXXXXXX
- name: PREFECT__CLOUD__TENANT_ID
value: TTTTTTTT
image: prefecthq/prefect:0.15.0-python3.7
Ben Muller
07/07/2021, 2:58 AMAustralia/Sydney
time?Sean True
07/07/2021, 8:31 AMKrzysztof Nawara
07/07/2021, 10:52 AMBen Collier
07/07/2021, 10:58 AMnib
07/07/2021, 12:14 PMnew_state.result
) and error details. But it’s empty and I can get only “Some reference tasks failed.” from new_state
. Is it possible to extract this kind of details?Madison Schott
07/07/2021, 2:30 PMFiveTranSyncTask()?
Also is it ok if I just have the dbt_task
defined before this with the parameters needed?
user_profile_w_campaign = Flow("User Profile with Campaign")
user_profile_w_campaign.set_dependencies(
task=dbt_task,
upstream_tasks=[FivetranSyncTask()]
)
user_profile_w_campaign.run()
Mike Wochner
07/07/2021, 3:36 PMwith Flow('Example') as flow:
today_date = datetime.date.today().strftime("%Y-%m-%d")
data = extract_data(security_list, today_date)
load_data(data)
...
more_data = extract_more_data(security_list)
load_more_data(more_data, today_date)
Amit
07/07/2021, 5:12 PMAmit
07/07/2021, 5:17 PMale
07/07/2021, 5:33 PMale
07/07/2021, 5:33 PMKevin Kho
07/07/2021, 7:11 PMale
07/07/2021, 7:42 PMKevin Kho
07/07/2021, 7:43 PMale
07/07/2021, 7:58 PMKevin Kho
07/07/2021, 7:58 PMale
07/07/2021, 7:59 PMcontainerDefinitions_environment
so that we can pass environment variables to the task that runs the flow.
Is this still possible with the ECS Agent?Kevin Kho
07/08/2021, 2:50 PMale
07/08/2021, 2:52 PMKevin Kho
07/08/2021, 2:53 PMenv
of ECSRun will pass throughregister_task_definition
.ale
07/08/2021, 2:56 PMcontainerDefinitions_environment
I can define env var at the agent level and then they are passed to the flow…rAgentFargateTaskDefinition:
Type: 'AWS::ECS::TaskDefinition'
Metadata:
cfn-lint:
config:
ignore_checks:
- E1029
DependsOn:
- rFlowTaskRolePolicy
- rAgentTaskRolePolicy
Properties:
TaskRoleArn: !Ref 'rAgentTaskRole'
ExecutionRoleArn: !GetAtt 'rAgentExecutionRole.Arn'
Family: !Sub '${pEnvironment}-prefect-fargate-agent'
ContainerDefinitions:
- Name: 'agent'
Essential: true
Command: ['prefect', 'agent', 'start', 'fargate', 'enable_task_revisions=true']
Image: !Sub
- 'prefecthq/prefect:${version}'
- version: !If [ cIsPrefectServerVersionLatest, 'all_extras', !Sub 'all_extras-${pPrefectServerVersion}' ]
MemoryReservation: 128
Environment:
- Name: PREFECT__CLOUD__API
Value: !Sub '<masked value>'
- Name: PREFECT__BACKEND
Value: 'server'
- Name: PREFECT__CLOUD__AGENT__LABELS
Value: '["heavy"]'
- Name: REGION_NAME
Value: !Ref AWS::Region
- Name: executionRoleArn
Value: !GetAtt 'rFlowsExecutionRole.Arn'
- Name: taskRoleArn
Value: !GetAtt 'rFlowsTaskRole.Arn'
- Name: networkMode
Value: 'awsvpc'
- Name: cluster
Value: !Ref pEcsCluster
- Name: networkConfiguration
Value: !Sub
- '{"awsvpcConfiguration": {"assignPublicIp": "DISABLED", "subnets": ["${subnets}"], "securityGroups": ["${pDWHClientSecurityGroup}"]}}'
- subnets: !Join ['","', !Ref pFargateSubnetIds]
- Name: cpu
Value: '4096'
- Name: memory
Value: '8192'
- Name: containerDefinitions_environment
Value: !Sub
- '[{
"name": "AWS_DEFAULT_REGION",
"value": "${AWS::Region}"
}, {
"name": "INTERNAL_SERVICES_CONNECTION_DATA_SECRET_ARN",
"value": "${rPrefectAgentInternalServicesConnectionDataSecret}"
}, {
"name": "DWH_JSON_CONNECTION_DATA_SECRET_ARN",
"value": "${rPrefectAgentDWHConnectionDataSecret}"
}, {
"name": "PLATFORM_DB_JSON_CONNECTION_DATA_SECRET_ARN",
"value": "${rPrefectAgentPlatformDBConnectionDataSecret}"
}, {
"name": "ENVIRONMENT",
"value": "${pEnvironment}"
}, {
"name": "DWH_QUERY_ROLE_ARN",
"value": "${pSafelakeDWHClusterRoleArn}"
}, {
"name": "EXTERNAL_SERVICES_SECRETS",
"value": "${rPrefectAgentExternalServicesSecret}"
}, {
"name": "DWH_PLATFORM_POSTGRES_USER_SECRET_ARN",
"value": "${pDWHPostgreSQLVersion11UserSecretArn}"
}, {
"name": "SAFELAKE_BUCKET_INGESTION_NAME",
"value": "${SafelakeBucketIngestionName}"
}, {
"name": "PREFECT__CONTEXT__SECRETS__SLACK_WEBHOOK_URL",
"value": "${pSlackWebhook}"
}, {
"name": "ROLLBAR_ENABLED",
"value": "true"
}, {
"name": "SLACK_ENABLED",
"value": "true"
}, {
"name": "PREFECT__SERVER__UI__ENDPOINT",
"value": "<masked_value>"
}
]'
- SafelakeBucketIngestionName: !Select [1, !Split [':::', !Ref pDatalakeBucketArn]]
- Name: containerDefinitions_logConfiguration
Value: !Sub '{ "logDriver": "awslogs", "options": { "awslogs-group": "${rFlowsLogsGroup}", "awslogs-region": "${AWS::Region}", "awslogs-stream-prefix": "prefect-flow"}}'
LogConfiguration:
LogDriver: 'awslogs'
Options:
awslogs-group: !Ref 'rAppLogsGroup'
awslogs-region: !Ref 'AWS::Region'
awslogs-stream-prefix: 'prefect-agent'
Tags:
- Key: 'Name'
Value: !Sub '${pEnvironment}-prefect-fargate-agent'
- Key: 'ApplicationVersion'
Value: !Ref 'pApplicationVersion'
Kevin Kho
07/08/2021, 3:01 PM--env
CLI flag on the agent are also passed through. Is that what you need?ale
07/08/2021, 3:02 PMEnvironment
instead of the --env
CLI flag?Kevin Kho
07/08/2021, 3:06 PMale
07/08/2021, 3:08 PMEnvironment
parameter.
Otherwise we’ll have to provide all values in the command
parameter where we start the agent (passing values as you said using --env
CLI flag)ale
07/09/2021, 4:27 PM--env
CLI flag to pass my env vars.
I just need to understand if using the Environment
in the task definition of the agent would work as well 😒imple_smile:Mariia Kerimova
07/09/2021, 10:10 PM--env
CLI flag on agent or using ECSAgent(env_vars=..)
or ECSRun(env=..)
) will not pass the env var to the flow. I'll ask the team if we can have implicit env vars discoveryale
07/10/2021, 7:52 AMMariia Kerimova
07/12/2021, 7:43 PM--env
CLI param in the Command parameter in the ECS task definition, the environment variables will be passed to the flow 🙂--env
or ECSRun(env=..)
or ECSAgent(env_var)
, the variables will not be passed to the flow runs, but if you'll specify variables using --env
it should work 👍 I hope it makes sense, if not, let me know and I'll try to give you an example.ale
07/12/2021, 7:56 PM--env
the values defined in the Environment would be super appreciated!Mariia Kerimova
07/12/2021, 8:48 PM--env
will look like this in the console:Properties:
TaskRoleArn: !Ref 'rAgentTaskRole'
ExecutionRoleArn: !GetAtt 'rAgentExecutionRole.Arn'
Family: !Sub '${pEnvironment}-prefect-fargate-agent'
ContainerDefinitions:
- Name: 'agent'
Essential: true
Command: ['prefect', 'agent', 'ecs', 'start', 'TEST_ECS_ENV=value_of_env_var']
from prefect.storage import Docker
from prefect.executors import LocalExecutor
from prefect.run_configs import ECSRun
from prefect import task, Flow
import prefect
import os
STORAGE = Docker(
registry_url="mariiaprefect",
image_name="envvars-testflow",
)
RUN_CONFIG = ECSRun(run_task_kwargs={"cluster": "default-cluster"}, labels=["masha", "envtoken"])
EXECUTOR = LocalExecutor()
@task
def print_me():
logger = prefect.context.get("logger")
env_value = os.environ['TEST_ECS_ENV']
<http://logger.info|logger.info>(f"Got the env var {env_value}")
with Flow(
"Ale", storage=STORAGE, run_config=RUN_CONFIG
) as flow:
print_me()
ale
07/12/2021, 9:06 PMTEST_ECS_ENV=${pEnvironment}
where pEnvironment is a parameter defined in teh CFN template.
Does this make sense?Mariia Kerimova
07/12/2021, 9:08 PMale
07/12/2021, 9:16 PM