Ralph Willgoss
10/02/2020, 2:55 PM[2020-10-02 14:32:01] INFO - (13104) task_runner.run | Task 'calculate_payoffs': finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'calculate_payoffs': finished task run for task with final state: 'Success'
Hui Zheng
10/02/2020, 4:43 PMprefect context
. It seems a powerful way to share common variables and objects among tasks. Is it possible to add new objects or variables inside the flow definition which then to be used by the tasks later?
https://docs.prefect.io/core/concepts/execution.html#modifying-context-at-runtime
such as
with Flow('Flow with new context') as flow:
# Could we add new objects to prefect.context here?
task_1_use_new_context_object()
Hammad A
10/02/2020, 4:55 PMVincent
10/02/2020, 6:19 PMBrett Naul
10/02/2020, 6:31 PMKyle Pierce
10/02/2020, 6:46 PMTypeError: 'FunctionTask' object is not iterable
I dont know if im over complicating this.Eric
10/04/2020, 6:28 PM@task(name="DF", log_stdout=True)
ms16
10/05/2020, 12:20 AMMax Lei
10/05/2020, 12:45 AMdef setup_logs():
# Example config
'root': {
'level': 'DEBUG',
'handlers': [
'console',
'info_file_handler',
'fluent'
]
}
logging.config.dictConfig(logging_config)
class DoSomething():
def run(self):
setup_logs()
<http://logger.info|logger.info>("Here")
The logging configurations sets up a default logger to stdout, a file handler, and also to a 3rd party log streaming service using fluent. I see the local file output, but not through the prefect UI.
export PREFECT__LOGGING__EXTRA_LOGGERS="['root']"
[logging]
extra_loggers = "['root']"
I have setup a toml file, and tried to set the environmental variable before running the agent, and the flow itself. However I still do not see the logs in the prefect UI.
Is there something else I need to do?Nakul Gowdra
10/05/2020, 2:31 AMNewskooler
10/05/2020, 10:24 AMRobin
10/05/2020, 11:52 AMSven Teresniak
10/05/2020, 12:14 PMParameter
. I cannot use slug or tags because I cannot set them to parameter values (or can I?).
I still work on a Lock-like ResourceManager but thats very difficult when it comes to scheduling-/parameter-dependent locking.
Creating a Lock on constants (e.g. a constant string) seems rather easy.
What I need (and trying to build) is tag concurrency limits for the standalone version. :)ale
10/05/2020, 12:18 PMitay livni
10/05/2020, 12:27 PMale
10/05/2020, 1:49 PMFargateTaskEnvironment(
taskRoleArn=ETL_TASK_ROLE_ARN, # ARN of the task role
executionRoleArn=ETL_EXECUTION_ROLE_ARN # ARN of the execution role
)
But I get the following error back from the Agent:
An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Fargate requires task definition to have execution role ARN to support ECR images
Any suggestions?Hui Zheng
10/05/2020, 5:28 PMKyle Pierce
10/05/2020, 6:10 PMTenzin Choedak
10/05/2020, 6:38 PMNewskooler
10/05/2020, 7:15 PMHui Zheng
10/05/2020, 11:05 PMget_timestamp()
to init_flow_run()
. It runs fine with docker storage locally, but after I pushed the new flow to the prefect-cloud, I encountered this error
Unexpected error while running flow: KeyError('Task slug init_flow_run-1 not found in the current Flow; this is usually caused by changing the Flow without reregistering it with the Prefect API.')
Bridget Pelletier-Ross
10/06/2020, 5:37 AMgunarevuri
10/06/2020, 6:15 AMclient.create_project("my_first_flow")
to create it." And here is my basic code from prefect import task, Flow
@task
def hello_world():
return "hello prefect"
@task
def prefect_say(s:str):
print(s)
with Flow("my_first_flow") as f:
r = hello_world()
s = prefect_say(r)
# f.run()
f.visualize()
f.register("my_first_flow")Alfie
10/06/2020, 11:03 AMDarren Forsdike
10/06/2020, 12:29 PMJacques
10/06/2020, 2:04 PMLocalDaskExecutor(scheduler="threads")
for running one-off flows, then started having issues with tasks being repeated and was recommended we switch to DaskExecutor
, which we've been using with DaskExecutor(cluster_kwargs = {"processes": False})
. We've now run into some new bugs with latest prefect (a side note, but here: https://github.com/PrefectHQ/prefect/issues/3443) - trying to understand what the difference is between LocalDaskExecutor
and DaskExecutor
using threads and a temporary cluster? Is there a performance/reliability advantage of using one over the other?Benjamin Filippi
10/06/2020, 3:53 PMMitchell Bregman
10/06/2020, 4:27 PMDaskKubernetesEnvironment
. Perhaps, we can use DaskKubernetesEnvironment
for all of our flows. Upon registering a new flow, as shown below, to the cloud and submitting a Quick Run, we get the error:
Kubernetes Error: Back-off pulling image
When not specifying the DaskKubernetesEnvironment
all registering, deploying, flow execution works just fine. Here is a sample flow that I am trying to use Dask for:
with Flow("test-flow") as flow:
numbers = numbers_task()
first_map = map_task.map(numbers)
second_map = map_task.map(first_map)
reduction = reduce_task(second_map)
flow.storage = Docker(
registry_url="<http://parkmobile-docker.jfrog.io|parkmobile-docker.jfrog.io>",
image_name="test-flow",
image_tag="0.0.1"
)
flow.environment = DaskKubernetesEnvironment(min_workers=2, max_workers=4)
flow.register("test")
Any ideas as to why the DaskKubernetesEnvironment is throwing off the flow execution?Jonas Hanfland
10/06/2020, 4:43 PMBigQueryTask
.
This is the task I am trying to run:
BigQueryTask(
name="Get all statement IDs",
query="""
SELECT * FROM temporary_tables.statements
""",
max_retries=3,
retry_delay=timedelta(seconds=30),
to_dataframe=True,
)
and it throws this exception:
(see thread)
All the times this task has been run in the past it passed just fine.
Does anybody have an idea what might be the issue? Thanks very much in advance!ale
10/06/2020, 4:53 PM