Lars Corneliussen
09/23/2020, 12:41 PMprefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
Anyone?as
09/23/2020, 12:51 PMjsonresult = task(
get_json_fun,
result=LocalResult(serializer=JSONSerializer()),
target='/path/result.json',
checkpoint=True,
)(p)
Is jsonresult still in memory next to being written to result.json or not?
I'm worried that my machine will get out of memory when my flow get's bigger with many task results floating around.Zach
09/23/2020, 3:34 PMquery {
flow_run_state {
flow_run {
id,
parameters,
created,
flow(where:{name:"my-flow-name"}) {
name,
id
},
}
}
}
Error:
{
"graphQLErrors": [
{
"message": "Unknown argument \"where\" on field \"flow\" of type \"flow_run\".",
"extensions": {
"code": "GRAPHQL_VALIDATION_FAILED"
}
}
],
"networkError": null,
"message": "GraphQL error: Unknown argument \"where\" on field \"flow\" of type \"flow_run\"."
}
Oleg
09/23/2020, 4:33 PM[server]
host = "ip"
port = "8081"
url = "https://${server.host}:${server.port}"
[ui]
host = "<http://ip>"
port = "8081"
host_port = "8081"
endpoint = "${ui.host}:${ui.port}"
And after checking prefect.config in python, I see my custom properties:
<Box: {'debug': False, 'home_dir': '/home/o.ilinsky/.prefect', 'backend': 'server', 'server': {'host': 'ip', 'port': 8081, 'host_port': 4200, 'endpoint': 'ip:8081', 'database': {'host': 'localhost', 'port': 5432, 'host_port': 5432, 'name': 'prefect_server', 'username': 'prefect', 'password': 'test-password', 'connection_url': '<postgresql://prefect:test-password@localhost:5432/prefect_server>', 'volume_path': '/home/o.ilinsky/.prefect/pg_data'}, 'graphql': {'host': '0.0.0.0', 'port': 4201, 'host_port': 4201, 'debug': False, 'path': '/graphql/'}, 'hasura': {'host': 'localhost', 'port': 3000, 'host_port': 3000, 'admin_secret': '', 'claims_namespace': 'hasura-claims', 'graphql_url': '<http://localhost:3000/v1alpha1/graphql>', 'ws_url': '<ws://localhost:3000/v1alpha1/graphql>', 'execute_retry_seconds': 10}, 'ui': {'host': '<http://localhost>', 'port': 8080, 'host_port': 8080, 'endpoint': '<http://localhost:8080>', 'graphql_url': '<http://localhost:4200/graphql>'}, 'telemetry': {'enabled': True}, 'url': '<https://ip:8081>'}, 'cloud': {'api': 'ip:8081', 'endpoint': '<https://api.prefect.io>', 'graphql': 'ip:8081/graphql', 'use_local_secrets': True, 'heartbeat_interval': 30.0, 'check_cancellation_interval': 15.0, 'diagnostics': False, 'logging_heartbeat': 5, 'queue_interval': 30.0, 'agent': {'name': 'agent', 'labels': [], 'level': 'INFO', 'auth_token': '', 'agent_address': '', 'resource_manager': {'loop_interval': 60}}}, 'logging': {'level': 'INFO', 'format': '[%(asctime)s] %(levelname)s - %(name)s | %(message)s', 'log_attributes': [], 'datefmt': '%Y-%m-%d %H:%M:%S', 'log_to_cloud': False, 'extra_loggers': []}, 'flows': {'eager_edge_validation': False, 'run_on_schedule': True, 'checkpointing': False, 'defaults': {'storage': {'add_default_labels': True, 'default_class': 'prefect.environments.storage.Local'}}}, 'tasks': {'defaults': {'max_retries': 0, 'retry_delay': None, 'timeout': None}}, 'engine': {'executor': {'default_class': 'prefect.engine.executors.LocalExecutor', 'dask': {'address': '', 'cluster_class': 'distributed.deploy.local.LocalCluster'}}, 'flow_runner': {'default_class': 'prefect.engine.flow_runner.FlowRunner'}, 'task_runner': {'default_class': 'prefect.engine.task_runner.TaskRunner'}}, 'ui': {'host': '<http://ip>', 'port': 8081, 'host_port': 8081, 'endpoint': '<http://ip:8081>', 'graphql_url': '<http://ip:4200>'}}>
But server starts with localhost:8080 option:
Visit <http://localhost:8080> to get started
And nothing on 8081 port.
I also tried to set env
export PREFECT__USER_CONFIG_PATH=/home/o.ilinsky/.prefect/config.toml
But it’s does’t work too.
Do you know what I do wrong?)
P.S. i’m new in prefect, so it may by stupid error :)
``````Alex
09/23/2020, 4:43 PMJohnny
09/23/2020, 5:04 PMDolor Oculus
09/23/2020, 6:32 PMdef test_configuration_works_as_expected(monkeypatch, tmp_path):
config_file = tmp_path / "config.toml"
config_text = """
environment = "prod"
user = "${environments.${environment}.user}"
[environments]
[environments.dev]
user = "test"
[environments.prod]
user = "admin"
"""
with config_file.open("w") as cf:
cf.write(config_text)
monkeypatch.setenv("PREFECT__USER_CONFIG_PATH", str(tmp_path))
assert config.environment == "prod"
assert config.user == "admin"
Arsenii
09/24/2020, 6:14 AMpsimakis
09/24/2020, 8:59 AMparameters_default
arguments. The problem is that the third scheduler never triggers a flow run and his parameters_default
arguments are never passed to any flow run. Is this some kind of conflict between the second and the third scheduler (same time)? I'm pretty sure that I have misunderstand how the scheduling is working.
Thanks in advanceLukas
09/24/2020, 12:41 PMFlowRunTask
based on the result of a previous task. To be precise:
Let's say I have a task and that task needs to fetch data from an API but hits the API request limit, now I want the task to return the data that it was able to fetch but also some sort of flag that shows that the task couldn't fetch all data because the api limit was hit, e.g. in a dict {"data": data, "all_data_fetched": False}
. Now if all_data_fetched
is false I'd like to start a new flow that continues fetching the data one hour later since that's the time it takes until the api limit is reset. I want to create a new flow since we're running on fargate and I don't want to let the task wait for one hour and pay all the time. And at the same time I want the rest of my flow to continue processing that data that the task was able to fetch from the API. I hope a) it's somewhat understandable what I want to achieve, b) it's possible and c) there is someone out there who can give me a hint on how to implement it. My biggest issue is that I don't know how to access the result from a task within a flow to check if I need to trigger the FlowRunTask
or not. AFAIK I cannot run FlowRunTask
from within a task, right? TIA 🙂kiran
09/24/2020, 3:16 PMChris Vrooman
09/24/2020, 3:40 PMBen Fogelson
09/24/2020, 4:50 PMResult
instance or a raw value? The functionality I’m looking for is basically this:
@task
def foo(bar):
if isinstance(bar, Result):
bar = bar.read(bar.location).value
return do_something(bar)
Clearly the above isn’t hard to add, but it also seems like the kind of thing where (a) there might be hidden gotchas and (b) it seems common enough that prefect
might have something built inJeremy Knickerbocker
09/24/2020, 7:07 PMThomas Borgen
09/24/2020, 7:08 PMsark
09/25/2020, 4:57 AM~/.docker/config.json
but does this mean for flows i have to pack the credentials in the image, instead of being able to mount a volume when the container is run?
i have looked at API documentation and couldn’t find any options for specifying volumes for flow containersNejc Vesel
09/25/2020, 7:18 AMwith Flow('flow-a') as flowA:
paramsFlowA = Parameter('flow-a-param', default=...)
< Does something here >
with Flow('flow-b') as flowB:
paramsFlowB = Parameter('flow-b-param', default=...)
< Does something here>
with Ffow('combined-flow') as combined_flow:
flow_a = FlowRunTask(flow_name='flow-a', project_name='test')
flow_b = FlowRunTask(flow_name='flow-b', project_name='test)
result = flow_b(upstream_tasks[flow_a])
When I deploy the combined_flow to the server, I can't set the parameters for FlowA and FlowB. Is it possible to do so and how?
Thanks!sark
09/25/2020, 7:20 AMreturn
like this:
@task
def start_job():
return job_id
i was wondering if it was possible to do the same for a container task created with CreateContainer
in my case what i am trying to do is to return the job id of the job started by that container, so that another task can use it to stop the jobale
09/25/2020, 7:29 AMsark
09/25/2020, 9:06 AMupstream_tasks=[upstream_task]
only seems to work when there are no data-dependencies?Eric
09/25/2020, 7:40 PMEric
09/25/2020, 7:40 PMEric
09/25/2020, 7:41 PMEric
09/25/2020, 7:41 PMEric
09/25/2020, 9:29 PMManuel Mourato
09/25/2020, 9:30 PMRahat Zaman
09/25/2020, 11:18 PMRahat Zaman
09/25/2020, 11:20 PMyield
result for B. B will start when it gets first yield
from AEric
09/25/2020, 11:53 PMNewskooler
09/26/2020, 10:33 AM