Jasono
10/27/2020, 9:08 PMprefect.context
. In my config.toml, a context variable is defined like foo = "202010"
. In the task definition, it’s used like year = prefect.context.foo[:4]
but it causes int object is not subscriptable
error. Is there a way to make it work without doing str(foo[:4])
?Hui Zheng
10/27/2020, 10:44 PMtask_A
, which returns a list_A
.
I want to create one task_B_i
for each item i in list_A
. However, there is a catch. those task_Bs needs to be executed in sequence, that is, task_B_0
execute first, and then task_B_1
follows, and then task_B_2
follows. If a task_B_i failed, all the subsequence task_Bs shall be skipped. Because those task_Bs
can NOT be executed in parallel, so it seems I can not use map()
task_B_i
returns an output (which is a string), item_b
. So, after all task_Bs executes, I have a list_B
of task_B_i_output.
I then need to have a task_c
, which takes list_B
, the results of all task_Bs, to carry some work. which is like a reduce operation of the upstream task_Bs.
I tried something like below but it fails obviously, and I have no better ideas for now.
with Flow("dynamic tasks in sequence")
list_A = task_A()
for item_a in list_A:
item_b = task_B(item_a)
it failed with error message
File "/app/flow.py", line 736, in <module>
for item_a in list_A:
TypeError: 'FunctionTask' object is not iterable
Dean Magee
10/28/2020, 3:35 AMfor unique_client in function_that_returns_list_of_40+_parameter_dicts():
flow.run(unique_client)
Surely there is a better way to run the same flow but with different parameters each time. Am I missing something?Ralph Willgoss
10/28/2020, 9:37 AMSimo Tumelius
10/28/2020, 12:08 PMVincent
10/28/2020, 3:28 PMYoussef
10/28/2020, 5:06 PMPhilip MacMenamin
10/28/2020, 5:16 PMjosh
10/28/2020, 6:05 PM0.13.13
has been released and here are a few notable changes:
⏰ FlowRunTask
can now schedule runs in the future
🤝 Upgrades to Azure tasks
🛠️ Fixes to stdout logging
🌐 Added option for networkMode
in Fargate Agent
🗃️ Fixed a caching bug
A big thank you to our contributors who helped out with this release! Full changelog:Maura Drabik
10/28/2020, 6:22 PMscm_validation.register(
project_name="SCM Validation",
build=True
)
it failed with this error message:
prefect.utilities.exceptions.ClientError: [{'path': ['project'], 'message': 'field "project" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.project', 'code': 'validation-failed', 'exception': {'message': 'field "project" not found in type: \'query_root\''}}}]
Cody Vandervoort
10/28/2020, 8:03 PMHui Zheng
10/28/2020, 9:56 PMShellTask(return_all=True)
https://github.com/PrefectHQ/prefect/issues/3584Jeremy Knickerbocker
10/28/2020, 10:29 PMHui Zheng
10/29/2020, 1:19 AMhealthcheck
step.
/opt/prefect/healthcheck.py:149: UserWarning: Task <Task: fetch_runnable_models> has retry settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
result_check(flows)
please see the thread for more detailsMarwan Sarieddine
10/29/2020, 3:24 AM<Failed: "Unexpected error: AttributeError("partially initialized module 'prefect' has no attribute 'schedules' (most likely due to a circular import)")">
wondering if anyone has encountered this before ?Zhiguo Yuan
10/29/2020, 4:44 AMZhiguo Yuan
10/29/2020, 4:44 AMNewskooler
10/29/2020, 9:32 AMpsimakis
10/29/2020, 10:22 AMschedules.Schedule(
clocks=[
clocks.IntervalClock(timedelta(hours=1)), # clock 0: fire every hour
clocks.IntervalClock(timedelta(hours=5)), # clock 1: fire every five hours
],
# but only on weekdays for the second clock
filters=[filters.is_weekday_clock_1]
)
I know that filters are applied in scheduler level but it could be very handy to apply filtering on clock level. Is there any (even hackie) way to achieve this? If is not possible, is there any way to provide more that one scheduler to a flow?
Thanks in advance!Christian
10/29/2020, 12:21 PMCouldn't connect to Prefect Server at <http://localhost:4200/graphql>
Clemens
10/29/2020, 3:21 PMModuleNotFoundError
.
Does anybody have any experience on how Dask is handling these imports?Алексей Филимонов
10/29/2020, 4:11 PMBrian Mesick
10/29/2020, 5:35 PMNuno
10/29/2020, 5:45 PMas_task
but, even with global scoped functions isn’t working as expected.
Current code while testing:
import prefect
from prefect import task, Flow
from prefect.utilities.tasks import as_task
def hello_world():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, World!")
flow = Flow("hello-flow", tasks=[as_task(hello_world)])
if __name__ == "__main__":
flow.run()
Does anyone has a suggestion? Thank you.George Coyne
10/29/2020, 6:33 PMGeorge Coyne
10/29/2020, 6:34 PMprefect agent start docker -t TOKEN_HERE -l local_agent --volume $Env:GOOGLE_APPLICATION_CREDENTIALS:/home/service_account.json --env GOOGLE_APPLICATION_CREDENTIALS=/home/service_account.json
____ __ _ _ _
| _ \ _ __ ___ / _| ___ ___| |_ / \ __ _ ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __| / _ \ / _` |/ _ \ '_ \| __|
| __/| | | __/ _| __/ (__| |_ / ___ \ (_| | __/ | | | |_
|_| |_| \___|_| \___|\___|\__| /_/ \_\__, |\___|_| |_|\__|
|___/
[2020-10-29 17:58:56,091] INFO - agent | Starting DockerAgent with labels ['local_agent']
[2020-10-29 17:58:56,091] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
[2020-10-29 17:58:56,092] INFO - agent | Agent connecting to the Prefect API at <https://api.prefect.io>
[2020-10-29 17:58:56,211] INFO - agent | Waiting for flow runs...
[2020-10-29 17:59:22,897] INFO - agent | Found 1 flow run(s) to submit for execution.
[2020-10-29 17:59:23,071] INFO - agent | Deploying flow run 7c642664-2e1d-4599-a0a2-c4c4f9496579
[2020-10-29 17:59:23,072] INFO - agent | Pulling image <http://gcr.io/GCLOUD_PROJECT/flows/internal-data/aggasetl2:2020-10-27t22-42-21-305568-00-00|gcr.io/GCLOUD_PROJECT/flows/internal-data/aggasetl2:2020-10-27t22-42-21-305568-00-00>...
[2020-10-29 17:59:24,540] ERROR - agent | Logging platform error for flow run 7c642664-2e1d-4599-a0a2-c4c4f9496579
[2020-10-29 17:59:24,846] ERROR - agent | Error while deploying flow: APIError(HTTPError('500 Server Error: Internal Server Error for url: <http+docker://localnpipe/v1.40/images/create?tag=2020-10-27t22-42-21-305568-00-00&fromImage=gcr.io%2FPROJECT%2Fflows%2Finternal-data%2Faggasetl2'>))
Edison A
10/29/2020, 9:13 PMArthur Duarte
10/29/2020, 9:38 PMraise NotImplementedError("cannot instantiate %r on your system" NotImplementedError: cannot instantiate 'WindowsPath' on your system
Any ideas?Edison A
10/29/2020, 10:20 PMflow.register('project_name')
File "venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1623, in register
registered_flow = client.register(
File "venv/lib/python3.8/site-packages/prefect/client/client.py", line 734, in register
serialized_flow = flow.serialize(build=build) # type: Any
File "venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1450, in serialize
self.storage.add_flow(self)
File "venv/lib/python3.8/site-packages/prefect/environments/storage/local.py", line 144, in add_flow
flow_location = flow.save(flow_location)
File "venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1519, in save
cloudpickle.dump(self, f)
File "venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 55, in dump
CloudPickler(
File "venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'weakref' object
Hui Zheng
10/29/2020, 11:50 PM