Jacob Blanco
11/10/2020, 9:33 AMwith Flow("Update Data") as flow:
last_export = get_last_export("my_table")
update_from = Parameter("update_from", default=last_export)
insert_query = parse_insert_query(update_from=update_from)
run_execute_query(insert_query)
This doesn’t work but I hope it illustrates what I’m trying to do. Is there any way to do this without having to write an intermediate task to “merge” the two inputs? I realize this pattern is wasteful by running get_last_export
for no reason if parameter is provided.Dor Menachem
11/10/2020, 3:47 PMDJ Erraballi
11/10/2020, 6:39 PM@task
def taskA() -> List[int]:
return results
with Flow() as flow:
param = Parameter('blah')
a_results = taskA()
flow_run_task = FlowRunTask()
flow_run_task.map(flow_name=unmapped('DownstreamFlow'), parameters=[{'blah': param, 'resultId': result} for result in a_results])
DJ Erraballi
11/10/2020, 6:49 PMDJ Erraballi
11/10/2020, 6:49 PMMark McDonald
11/10/2020, 9:04 PMMarley
11/10/2020, 9:20 PMNewskooler
11/10/2020, 9:22 PMJacob Blanco
11/11/2020, 3:25 AMEdison A
11/11/2020, 9:13 AM8080
? I have only seen a config example for the API port. Can you help me with a link to a page that contains all the possible config params for config.toml
?Greg Roche
11/11/2020, 10:05 AMflow.run()
, but registering the flow on Prefect Server and triggering the flow from there fails because of module errors. I guess I'm just misunderstanding python's importing logic (I'm by no means an expert...) so a basic working example that I can build on would really help me get off the ground.simone
11/11/2020, 3:53 PMprefect agent local start --api <http://172.22.0.5:4200>
I get IP from the terminal printout of the `prefect server start`command.
Using this setup I am able to run flows, however I am not able to catch logs.
I tested different server settings in the `config.toml`file but none fixed the issue:
[server]
[server.ui]
apollo_url="<http://0.0.0.0:4200/graphql>"
[server]
[server.ui]
apollo_url="<http://localhost:4200/graphql>"
If I understand how things work (please be patient I am a biologist 🙂 ) the issue is on my side and is caused by a mistake in the identification of the correct IP to use for setting up the system. If it is the case is there a way to predefine the IP where the services are starting? If I am wrong please let me know, any help is appreciated!
Below is the toy flow I have been using to test test logging
import prefect
from prefect import task, Flow, Parameter, flatten, unmapped
from prefect.engine.executors import DaskExecutor
from prefect.utilities.debug import raise_on_exception
from prefect.utilities.logging import get_logger
from datetime import timedelta, datetime
from prefect.schedules import IntervalSchedule
# MOCK TASK FUNCTION TO BUY TIME
@task(task_run_name=lambda **kwargs: f"testing-logger-writing-logs-{kwargs['x']}-suiname",log_stdout=True)
def wlog(x):
logger = prefect.context.get("logger")
logger.debug('i am debugging')
# logger = prefect_logging_setup('test')
<http://logger.info|logger.info>(f'start sleep')
time.sleep(20)
<http://logger.info|logger.info>(f'done sleep')
a = list(range(10))
# with Flow("test_running",schedule=schedule) as flow:
with Flow("logging-flow",environment=LocalEnvironment(DaskExecutor(address='<tcp://193.10.16.58:18938>'))) as flow:
logger = prefect.utilities.logging.get_logger()
<http://logger.info|logger.info>('this log is generated in the flow')
out_task = wlog.map(a)
<http://logger.info|logger.info>('done')
flow.register(project_name="test")
Robin
11/11/2020, 6:07 PMred button
to stop all running flows or tasks? 🔴
Our task concurrency limit indicates, that some tasks are still running, although no flow is running.
So I guess there are some zombie tasks hiding somewhere, that I would like to see stopped. 😮
I am exploring the interactive API, but haven't found a quick way to just cancel all tasks
...Joël Luijmes
11/11/2020, 6:21 PMfabian wolfmann
11/11/2020, 7:27 PMMark McDonald
11/11/2020, 10:30 PMM Taufik
11/12/2020, 5:19 AMjamodes
11/12/2020, 8:32 AMDJ Erraballi
11/12/2020, 8:37 AMDJ Erraballi
11/12/2020, 8:40 AM- prefect [required: ==0.11.5, installed: 0.11.5]
- click [required: >=7.0,<8.0, installed: 7.1.2]
- cloudpickle [required: >=0.6.0,<1.5, installed: 1.4.1]
- croniter [required: >=0.3.24,<1.0, installed: 0.3.36]
- natsort [required: Any, installed: 7.0.1]
- python-dateutil [required: Any, installed: 2.8.1]
- six [required: >=1.5, installed: 1.14.0]
- dask [required: >=0.19.3,<3.0, installed: 2.30.0]
- pyyaml [required: Any, installed: 5.3.1]
- distributed [required: >=1.26.1,<3.0, installed: 2.30.1]
- click [required: >=6.6, installed: 7.1.2]
- cloudpickle [required: >=1.5.0, installed: 1.4.1]
- contextvars [required: Any, installed: 2.4]
- immutables [required: >=0.9, installed: 0.14]
- dask [required: >=2.9.0, installed: 2.30.0]
- pyyaml [required: Any, installed: 5.3.1]
- msgpack [required: >=0.6.0, installed: 1.0.0]
- psutil [required: >=5.0, installed: 5.7.3]
- pyyaml [required: Any, installed: 5.3.1]
- setuptools [required: Any, installed: 45.2.0]
- sortedcontainers [required: !=2.0.1,!=2.0.0, installed: 2.3.0]
- tblib [required: >=1.6.0, installed: 1.7.0]
- toolz [required: >=0.8.2, installed: 0.11.1]
- tornado [required: >=5, installed: 6.1]
- zict [required: >=0.1.3, installed: 2.0.0]
- heapdict [required: Any, installed: 1.0.1]
- docker [required: >=3.4.1,<5.0, installed: 4.3.1]
- requests [required: >=2.14.2,!=2.18.0, installed: 2.25.0]
- certifi [required: >=2017.4.17, installed: 2020.4.5.1]
- chardet [required: >=3.0.2,<4, installed: 3.0.4]
- idna [required: >=2.5,<3, installed: 2.9]
- urllib3 [required: >=1.21.1,<1.27, installed: 1.26.1]
- six [required: >=1.4.0, installed: 1.14.0]
- websocket-client [required: >=0.32.0, installed: 0.57.0]
- six [required: Any, installed: 1.14.0]
- marshmallow [required: >=3.0.0b19,<3.6.1, installed: 3.5.1]
- marshmallow-oneofschema [required: >=2.0.0b2,<3.0, installed: 2.1.0]
- marshmallow [required: >=3.0.0rc6,<4.0.0, installed: 3.5.1]
- mypy-extensions [required: >=0.4.0,<1.0, installed: 0.4.3]
- pendulum [required: >=2.0.4,<3.0, installed: 2.1.2]
- python-dateutil [required: >=2.6,<3.0, installed: 2.8.1]
- six [required: >=1.5, installed: 1.14.0]
- pytzdata [required: >=2020.1, installed: 2020.1]
- python-box [required: >=3.4.4,<5.0, installed: 4.2.3]
- ruamel.yaml [required: Any, installed: 0.16.12]
- ruamel.yaml.clib [required: >=0.1.2, installed: 0.2.2]
- toml [required: Any, installed: 0.10.2]
- python-dateutil [required: ~=2.7, installed: 2.8.1]
- six [required: >=1.5, installed: 1.14.0]
- python-slugify [required: >=1.2.6,<5.0, installed: 4.0.1]
- text-unidecode [required: >=1.3, installed: 1.3]
- pytz [required: >=2018.7, installed: 2019.3]
- pyyaml [required: >=3.13,<5.4, installed: 5.3.1]
- requests [required: >=2.20,<3.0, installed: 2.25.0]
- certifi [required: >=2017.4.17, installed: 2020.4.5.1]
- chardet [required: >=3.0.2,<4, installed: 3.0.4]
- idna [required: >=2.5,<3, installed: 2.9]
- urllib3 [required: >=1.21.1,<1.27, installed: 1.26.1]
- tabulate [required: >=0.8.0,<1.0, installed: 0.8.7]
- toml [required: >=0.9.4,<1.0, installed: 0.10.2]
- urllib3 [required: >=1.24.3, installed: 1.26.1]
DJ Erraballi
11/12/2020, 8:40 AMDJ Erraballi
11/12/2020, 8:41 AMRobin
11/12/2020, 10:53 AMjamodes
11/12/2020, 11:33 AMDor Menachem
11/12/2020, 12:57 PMFelipe Netto
11/12/2020, 3:00 PMWalt Wells
11/12/2020, 3:27 PMJoël Luijmes
11/12/2020, 3:33 PMRodrigo Neves
11/12/2020, 3:51 PMMac Gréco Péralte Chéry
11/12/2020, 3:54 PMflow.environment = DaskCloudProviderEnvironment(
provider_class=FargateCluster,
# task_role_arn="arn:aws:iam::497427061914:role/ecsTaskRole",
execution_role_arn="arn:aws:iam::497427061914:role/ecsTaskRole",
n_workers=4,
scheduler_cpu=256,
scheduler_mem=512,
worker_cpu=512,
worker_mem=1024,
labels=["fargate"]
)