Didier Marin
07/29/2021, 1:28 PMChristian
07/29/2021, 1:40 PMLeon Kozlowski
07/29/2021, 2:55 PMBrad I
07/29/2021, 3:13 PMGCP_CREDENTIALS
secret in Prefect Cloud? Or will it just use the service account tied to the namespace? The latter would be nice to avoid creating any long lived service account keys.Rutvik Patel
07/29/2021, 3:24 PMHilary Roberts
07/29/2021, 3:48 PMPedro Machado
07/29/2021, 4:02 PMDan Zhao
07/29/2021, 4:12 PMAnze Kravanja
07/29/2021, 4:42 PMfor t in flow.tasks:
tr = state.result.get(t, None)
if not tr: continue
<http://flow.logger.info|flow.logger.info>(f"Task name '{t.name}' -> State '{tr.message}' -> Is failed: {tr.is_failed()}")
if tr.is_failed():
err_params['tasks'].append(OrderedDict({
'taskName': t.name,
'errorMessage': tr.message,
'errorParams': tr.result if isinstance(tr.result, dict) else str(tr.result)
}))
Basically just going through all the tasks and checking if they are is_failed is true, if so I am grabbing some info.
This all works as intended locally, but when I package my flows in a docker and run with docker agent, it turns out state.result is an empty dictionary.
While previously locally, I found each tasks result there.
I’ve played with GCSResult and just leaving it to default but in both cases while running in docker the state.result={}.
Any ideas what I might be doing wrong?Kyle McChesney
07/29/2021, 4:58 PMParameter validation failed:
Unknown parameter in input: "null", must be one of: capacityProviderStrategy, cluster, count, enableECSManagedTags, enableExecuteCommand, group, launchType, networkConfiguration, overrides, placementConstraints, placementStrategy, platformVersion, propagateTags, referenceId, startedBy, tags, taskDefinition
This is coming from the boto3 call triggered here: https://github.com/PrefectHQ/prefect/blob/6b59d989dec33aad8c62ea2476fee519c32f5c63/src/prefect/agent/ecs/agent.py#L320
My agent is being started like so (with prefecthq/prefect:0.14.13-python3.8, maybe this is old?):
prefect agent ecs start --run-task-kwargs s3://$bucket/run_task_kwargs.yml -a <https://my-api:4200/graphql>
My run task kwargs yaml has cluster
, launchType
and networkConfiguration
. I also attempted to provide values in the run config via the UI, but it did not seem to change much (added CPU, memory, task and exec role). Any help is much appreciated!chicago-joe
07/29/2021, 5:25 PMTim Enders
07/29/2021, 5:45 PM[2021-07-29 12:41:09-0500] INFO - prefect.LocalDaskExecutor | Attempting to interrupt and cancel all running tasks...
Philip MacMenamin
07/29/2021, 6:02 PMwith Flow() as f:
big_obj = gen_obj1()
big_obj2 = gen_obj2(big_obj) #never need to use big_obj again, want to reclaim memory
big_obj3 = gen_obj3(big_obj2) # finished with big_obj2
...etc
what's the canonical way of freeing up these objects within the flow, once they've been consumed and are no longer needed?YD
07/29/2021, 6:48 PMhello_world.py
the Prefect.io server runs on a VM on AWS, with python 3.6
with latest prefect python package
agent is running:
agent
LocalAgent
LAST QUERY
11:19am | 1 seconds ago
CORE VERSION
0.15.3
when running hello_world.py
using python3.6, the schedule does not work
when running using python3.8, the schedule works, but I get errors
Why does the schedule does not work with the python3.6 ?
(It used to work.. not sure what changed)
I tried pip install prefect --upgrade (both on laptop and VM)
killed the old agent (on lap top)
prefect backend server (on laptop)
prefect agent local start --label "<label>" (on laptop)
this did not helpLeon Kozlowski
07/29/2021, 7:20 PMMichael Warnock
07/29/2021, 7:37 PM.set_upstream
to create a relationship between two tasks, where the downstream task doesn't care about the upstream's (in-band) results? The docs say it's possible, but I end up with two copies of the downstream task when doing what seems like the obvious.Ben Muller
07/29/2021, 10:09 PMHugo Shi
07/29/2021, 10:46 PMYD
07/29/2021, 11:24 PMChris L.
07/30/2021, 7:42 AMRUN prefect auth login --key ${PREFECT__CLOUD__API_KEY}
where `PREFECT__CLOUD__API_KEY`` is an ARG. But I'm getting this error:
#13 1.047 Your API key is set in the Prefect config instead of with the CLI. To log in with the CLI, remove the config key `prefect.cloud.api_key`
jake lee
07/30/2021, 9:02 AMFailed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'common\'",)\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.',)
Again thank you so much!Daniel Bast
07/30/2021, 2:03 PMArran
07/30/2021, 3:07 PMfiles = [1, 2, 3]
data = extract_data.map(files)
upload = upload.map(data)
I would like to transform the above code so that file 1 will be extracted and uploaded before moving on to file 2. I know i could wrap this functionality in to one function but i would prefer to keep them separateKyle McChesney
07/30/2021, 4:07 PMBatchSubmit
and AWSClientWait
in order to submit a job and wait for it?
with Flow('flow'):
batch_input = Parameter('batch_input')
job_res = BatchSubmit(
job_name='job_name',
job_definition='job-def',
job_queue='job-queue',
).run(
batch_kwargs={
'parameters': {
'batch_input': batch_input,
},
},
)
wait_res = AWSClientWait(
client='batch',
waiter_name='JobComplete',
).run(
waiter_kwargs={
'jobs': [job_res]
}
)
This is what I have. Feels a bit weird to call .run
directly on the task (I am new at this and have mostly just run @task
annotated functions). Additionally, boto is complaining that the batch_input
values need to be strings: type: <class 'prefect.core.parameter.Parameter'>, valid types: <class 'str'>
Robert Hales
07/30/2021, 4:17 PMtest_reduce
gets skipped but I would like for it to receive [1, 2, 4, 5]
Hugo Kitano
07/30/2021, 5:59 PMFailed to load and execute Flow's environment: FlowStorageError("An error occurred while unpickling the flow:\n TypeError('an integer is required (got type bytes)')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - python: (flow built with '3.7.5', currently running with '3.8.11')")
Steve Pamer
07/30/2021, 7:43 PMMichael Warnock
07/30/2021, 8:50 PMconfig.toml
and secrets added to it becoming available when doing local testing? That appears to be the case, since a missing local secret error went away seemingly magically, as I was preparing to ask about it. Alternatively, I had added the secret to cloud first, so maybe there was a (longer) lag before it found it there, and it's still not using my local-secrets?
Also, SlackTask doesn't work when supplying 'message' at runtime (or directly to run); I get TypeError: run() got multiple values for argument 'message'
- something to do with that defaults_from_attrs magic?Fina Silva-Santisteban
07/30/2021, 9:02 PMdef do_something_1(params_1):
(does something)
File 2:
from path_to_file.file_1 import do_something_1
class SomethingTask(Task):
def run(self, params_1):
do_something_1(params_1)
do_something_2
File 3:
import unittest
from unittest.mock import patch
from path_to_file.file_2.SomethingTask import SomethingTask
@patch('path_to_file.file_1.do_something_1')
class TestSomethingTask(unittest.TestCase):
def test_1(self, mock_do_something_1):
task = SomethingTask()
task.run()
self.assertTrue(mock_do_something_1.called)
Instead of mocking do_something_1()
it actually calls do_something_1()
, and strangely the assertion returns False.Kyle McChesney
07/30/2021, 9:35 PMAn error occurred (InvalidParameterException) when calling the RunTask operation: Task definition does not support launch_type FARGATE.
I am running a flow via an ECS agent. It worked just fine until I specified a custom value for image
when submitting the job via the UI. The image I specified was for an ECR image