Samuel Hinton
02/03/2021, 1:56 PM_____ _____ ______ ______ ______ _____ _______
| __ \| __ \| ____| ____| ____/ ____|__ __|
| |__) | |__) | |__ | |__ | |__ | | | |
| ___/| _ /| __| | __| | __|| | | |
| | | | \ \| |____| | | |___| |____ | |
|_| |_| \_\______|_| |______\_____| |_|
Thanks for using Prefect!!!
This is the official docker image for Prefect Core, intended for executing
Prefect Flows. For more information, please see the docs:
<https://docs.prefect.io/core/getting_started/installation.html#docker>
Samuel Hinton
02/03/2021, 2:46 PMprefect server start
locally (which launches fine and gets me to the Welcome to PREFECT SERVER ascii splash screen). Navigating to localhost:8080 shows me connect to the server (and pull down the favicon+title) but the web page is empty (completely white). I can see its got content and scripts in the html, but theres this error and nothing is every made visible in the UI:Mitchell Bregman
02/03/2021, 3:00 PMSamuel Hinton
02/03/2021, 3:21 PMprefect backend server
prefect server create-tenant --name default --slug default
And doing this does seem to fix the problem. Is there a way I can specify in the config options or somewhere in the docker-compose that I’d like to ensure this tenant exists on server startup?Verun Rahimtoola
02/03/2021, 6:30 PMAmit
02/03/2021, 8:31 PMCarter Kwon
02/03/2021, 10:04 PMBK Lau
02/03/2021, 11:51 PMlifecycle
methods or hooks or handlers for flow/tasks that user can override or register?? Something analogous or similar along this idea(https://kubernetes.io/docs/concepts/containers/container-lifecycle-hooks/)Sam Peck
02/04/2021, 12:12 AM.map
.
When I run a flow like this:
with Flow(f"test flow|) as flow:
values = [1,2,3]
task_1_results = task1.map(values)
task_2_results = task2.map(task_1_results)
Any task1 results that raised a FAIL
signal don’t get run by task2, instead task2 aborts early with Finished task run for task with final state: 'TriggerFailed'
However when I change this slightly to introduce a filter of some kind:
my_filter = FilterTask()
with Flow(f"test flow|) as flow:
values = [1,2,3]
task_1_results = my_filter(task1.map(values))
task_2_results = task2.map(task_1_results)
All task1 results get mapped onto task2, including those that failed.
I get the sense that I’ve misunderstood the functional API and that’s what’s tripping me up, however I haven’t found anything in the docs that unlocks my intuition about why it would behave this way and how to modify that behavior. My first thought was to modify the filter to look out for failed results, but from stepping through in my debugger it doesn’t look like the trigger function gets any of that context.Sanjay Patel
02/04/2021, 12:13 AMMichael Wooley
02/04/2021, 6:13 AMyield
part of a task result for use downstream? If not, what is the best alternative? Or am I just thinking about this all wrong?Joël Luijmes
02/04/2021, 10:41 AMimport prefect
from prefect import task, Flow, case
from prefect.tasks.control_flow import merge
@task
def check_condition():
return True
@task
def log_output(val):
<http://prefect.context.logger.info|prefect.context.logger.info>(val)
with Flow("conditional-branches") as flow:
cond = check_condition()
with case(cond, False):
task_true = log_output("Branch: true")
task_end = log_output("Main branch end")
task_end.set_upstream(merge(task_true))
flow.visualize()
flow.run()
Peter Roelants
02/04/2021, 11:10 AMSamuel Hinton
02/04/2021, 11:21 AMhello_flow
example flow never executes. The agent startup seems fine:
agent_1 | [2021-02-04 11:16:51,742] INFO - agent | Starting LocalAgent with labels ['753b3ccf1df5']
agent_1 | [2021-02-04 11:16:51,742] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
agent_1 | [2021-02-04 11:16:51,742] INFO - agent | Agent connecting to the Prefect API at <http://apollo:4200>
And its still querying the server (see image), but it never actually does anything? I note that manually starting an agent seems to work, but including an agent in the docker-compose that launches all the other prefect services does not, and the debug output is just continually stating that no flows are found, disagreeing with the UI rather confusingly.
agent_1 | [2021-02-04 11:34:48,152] DEBUG - agent | No flow runs found
agent_1 | [2021-02-04 11:34:48,153] DEBUG - agent | Next query for flow runs in 10.0 seconds
Samuel Hinton
02/04/2021, 12:46 PMMichael Hadorn
02/04/2021, 1:29 PMSamuel Hinton
02/04/2021, 1:46 PMArnoldas Bankauskas
02/04/2021, 2:42 PMPS C:\WINDOWS\system32> prefect backend server
Backend switched to server
PS C:\WINDOWS\system32> prefect server start
Traceback (most recent call last):
File "site-packages\docker\api\client.py", line 259, in _raise_for_status
File "site-packages\requests\models.py", line 941, in raise_for_status
requests.exceptions.HTTPError: 500 Server Error: Internal Server Error for url: <http+docker://localnpipe/version>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "site-packages\docker\api\client.py", line 205, in _retrieve_server_version
File "site-packages\docker\api\daemon.py", line 181, in version
File "site-packages\docker\api\client.py", line 265, in _result
File "site-packages\docker\api\client.py", line 261, in _raise_for_status
File "site-packages\docker\errors.py", line 31, in create_api_error_from_http_exception
docker.errors.APIError: 500 Server Error: Internal Server Error ("b'open \\\\.\\pipe\\docker_engine_linux: The system cannot find the file specified.'")
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "docker-compose", line 3, in <module>
File "compose\cli\main.py", line 67, in main
File "compose\cli\main.py", line 123, in perform_command
File "compose\cli\command.py", line 69, in project_from_options
File "compose\cli\command.py", line 132, in get_project
File "compose\cli\docker_client.py", line 43, in get_client
File "compose\cli\docker_client.py", line 170, in docker_client
File "site-packages\docker\api\client.py", line 188, in __init__
File "site-packages\docker\api\client.py", line 213, in _retrieve_server_version
docker.errors.DockerException: Error while fetching server API version: 500 Server Error: Internal Server Error ("b'open \\\\.\\pipe\\docker_engine_linux: The system cannot find the file specified.'")
[6744] Failed to execute script docker-compose
Exception caught; killing services (press ctrl-C to force)
maybe any ideas how to solve that ?Karolína Bzdušek
02/04/2021, 3:00 PMMarwan Sarieddine
02/04/2021, 4:00 PMCole Howard
02/04/2021, 5:44 PMLuis Gallegos
02/04/2021, 5:48 PMquery {
database {
name: prefet_server
table: flow_run
action: flush
where: date <= '2020-12-31'
}
}
Sean Talia
02/04/2021, 7:01 PMconfig.toml
– i'm just trying to figure out where in the development lifecycle the config.toml
file is primarily used. I'm looking at this documentation on extra loggers and am wondering if I want to have different loggers in use when different flows are running, and i'm using DockerRun
for my runconfig, is the most appropriate thing to do to just change up the PREFECT_LOGGING_EXTRA_LOGGERS
env variable in my runconfig images?Alvis Tang
02/04/2021, 7:34 PMrun_agent
with parameters?Vincent
02/04/2021, 8:00 PMFailed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'request to <http://graphql:443/graphql/> failed, reason: connect EHOSTUNREACH 10.30.43.11:443', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'request to <http://graphql:443/graphql/> failed, reason: connect EHOSTUNREACH 10.30.43.11:443', 'type': 'system', 'errno': 'EHOSTUNREACH', 'code': 'EHOSTUNREACH'}}}])
Traceback (most recent call last):
File "/opt/conda/envs/dev/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 98, in call_runner_target_handlers
state = self.client.set_task_run_state(
File "/opt/conda/envs/dev/lib/python3.8/site-packages/prefect/client/client.py", line 1503, in set_task_run_state
result = self.graphql(
File "/opt/conda/envs/dev/lib/python3.8/site-packages/prefect/client/client.py", line 319, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'request to <http://graphql:443/graphql/> failed, reason: connect EHOSTUNREACH 10.30.43.11:443', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'request to <http://graphql:443/graphql/> failed, reason: connect EHOSTUNREACH 10.30.43.11:443', 'type': 'system', 'errno': 'EHOSTUNREACH', 'code': 'EHOSTUNREACH'}}}]
Matt Denno
02/04/2021, 8:25 PM@task(checkpoint=False)
def fetch() -> ProtoBufObject:
return fetch_proto_buff_object()
In the past I have set checkpoint=False to avoid serialization errors with the ProtoBuff objects, but now, even with checkpoint=False I am getting the following when i try to register the flow.
TypeError: can't pickle google.protobuf.pyext._message.MessageDescriptor objects
To resolve this I tried to create a serializer for the ProtoBuff like so:
class PBSerializer(Serializer):
def serialize(self, value: Any) -> bytes:
# transform a Python object into bytes
return value.SerializeToString()
def deserialize(self, value: bytes) -> Any:
# recover a Python object from bytes
ts = amanzi_pb2.TimeSeries
return ts.ParseFromString(value)
and called like:
@task(checkpoint=False, result=LocalResult(serializer=PBSerializer()))
def fetch() -> ProtoBufObject:
return fetch_proto_buff_object()
But I get the same error still. It doesn't seem like the result serializer is being used, but it is also not causing any errors. I have tried to debug and step through the code but can't figure out what is happening.
Any help, guidance, ideas for how to resolve would be welcomed.
I am on v 0.13.19Wenli Wan
02/04/2021, 9:00 PMTask
with log_stdout=True
2. add a FileHandeler
to the logger of the Task
with the following code:
class MyTask(Task):
def __init__(self, test_id, log_stdout=True, **kwargs):
super().__init__(log_stdout=log_stdout, **kwargs)
self.test_id = test_id
fh = FileHandler(f"../logging_test/test_{test_id}.log")
self.logger.addHandler(fh)
def run(self):
<http://self.logger.info|self.logger.info>("An info message.")
print(f'test {self.test_id}')
Then run it as the following:
t1 = MyTask(1, name="test1", log_stdout=False)
t2 = MyTask(2, name="test2")
f = Flow("logging test")
f.add_task(t1)
f.add_task(t2)
f.run()
I found out the stdout from the Task has been redirected to the logger of the TaskRunner
instead of the logger of MyTask
:
[2021-02-04 18:00:09-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'logging test'
[2021-02-04 18:00:09-0500] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-02-04 18:00:09-0500] DEBUG - prefect.FlowRunner | Flow 'logging test': Handling state change from Scheduled to Running
[2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | Task 'test1': Starting task run...
[2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test1': Handling state change from Pending to Running
[2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test1': Calling task.run() method...
[2021-02-04 18:00:09-0500] INFO - prefect.test1 | An info message from test 1.
test 1
[2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test1': Handling state change from Running to Success
[2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | Task 'test1': Finished task run for task with final state: 'Success'
[2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | Task 'test2': Starting task run...
[2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test2': Handling state change from Pending to Running
[2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test2': Calling task.run() method...
[2021-02-04 18:00:09-0500] INFO - prefect.test2 | An info message from test 2.
[2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | test 2
[2021-02-04 18:00:09-0500] DEBUG - prefect.TaskRunner | Task 'test2': Handling state change from Running to Success
[2021-02-04 18:00:09-0500] INFO - prefect.TaskRunner | Task 'test2': Finished task run for task with final state: 'Success'
[2021-02-04 18:00:09-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-02-04 18:00:09-0500] DEBUG - prefect.FlowRunner | Flow 'logging test': Handling state change from Running to Success
is there a way to have the Task stdout log in the Task logger? I'm trying to keep a log per task include the task stdoutAlvis Tang
02/04/2021, 9:54 PMrun_agent
but I can't make it work
I tried to use
flow.executor = LocalDaskExecutor()
flow.run()
& it can run in parallel, but not run_agent
. any clue?Chris Jordan
02/04/2021, 10:10 PMrerun_flow = StartFlowRun(
flow_name="blast_import_flow",
project_name="python_imports"
)
spawn_time_series_import = StartFlowRun(
project_name="python_imports",
flow_name="blast_metric_series_flow",
wait=True)
with Flow("blast_import_flow",
schedule=daily_schedule,
state_handlers=[cloud_only_slack_handler]
) as flow:
data_object = get_blast_batch()
check_api_result = check_api(data_object=data_object)
transformed_data_objects = transform(data_object=check_api_result)
imported = import_to_database(data_object_list=transformed_data_objects)
blast_ids, record_length = push_records_to_summary_table(imported)
# package ids into the right dict format
packaged_ids = package_into_params.map(blast_id=blast_ids)
spawn = spawn_time_series_import.map(
parameters=packaged_ids)
with case(record_length, 5):
rerun_flow()
The ETL portions of the flow are working just fine, and the spawn_time_series_import
is correctly creating its tasks. But rerun_flow
isn't - I'm seeing this error:
Unexpected error: ClientError([{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}])
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 860, in get_task_run_state
logger=self.logger,
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/tasks.py", line 449, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 172, in run
run_link = client.get_cloud_url("flow-run", flow_run_id)
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 887, in get_cloud_url
tenant_slug = self.get_default_tenant_slug(as_user=as_user and using_cloud_api)
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 920, in get_default_tenant_slug
res = self.graphql(query)
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 318, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}]
I'm not sure what the difference is, aside from the wait. I figure I don't want a bunch of these telescoping and waiting on each other to finish.Danny Vilela
02/04/2021, 11:21 PMfrom prefect import Flow
from prefect import Task
class AddTask(Task):
def run(self, x: int, y: int) -> int:
return x + y
with Flow(name="adding") as flow:
adder: AddTask = AddTask()
a: int = adder(x=1, y=2)
b: int = adder(x=a, y=3)
print(a, b)
flow.run()
It runs just fine, but I get two different “warning” highlights from my IDE (PyCharm):
1. On the def run(self, x: int, y: int) -> int
line: Signature of method 'AddTask.run()' does not match signature of base method in class 'Task'
2. On both assignment lines in the flow: Expected type 'int', got 'AddTask' instead
I’d like to know what I’m doing that’s un-Prefect-like. Does Prefect work well with type annotations? I’m aware that, really, the x
and y
parameters to AddTask
are actually converted to parameters (or a constant task?). But that’s maybe not as clear as annotating them as integers.Danny Vilela
02/04/2021, 11:21 PMfrom prefect import Flow
from prefect import Task
class AddTask(Task):
def run(self, x: int, y: int) -> int:
return x + y
with Flow(name="adding") as flow:
adder: AddTask = AddTask()
a: int = adder(x=1, y=2)
b: int = adder(x=a, y=3)
print(a, b)
flow.run()
It runs just fine, but I get two different “warning” highlights from my IDE (PyCharm):
1. On the def run(self, x: int, y: int) -> int
line: Signature of method 'AddTask.run()' does not match signature of base method in class 'Task'
2. On both assignment lines in the flow: Expected type 'int', got 'AddTask' instead
I’d like to know what I’m doing that’s un-Prefect-like. Does Prefect work well with type annotations? I’m aware that, really, the x
and y
parameters to AddTask
are actually converted to parameters (or a constant task?). But that’s maybe not as clear as annotating them as integers.Michael Adkins
02/04/2021, 11:24 PMDanny Vilela
02/04/2021, 11:32 PMrun
method? It’s no real problem to silence mypy/the IDE on those lines (e.g., # noqa
), but just making sure 🙂Michael Adkins
02/04/2021, 11:33 PMDanny Vilela
02/04/2021, 11:35 PMSignature of method 'AddTask.run()' does not match signature of base method in class 'Task'
. For the second point you can just set b: int = adder(x=a, y=2) # noqa
. For the first, you’d have to do def run(self, x: int, y: int) -> int: # noqa
. Just checking that my understanding of how/when you need to silence the type checker is correct!Michael Adkins
02/04/2021, 11:38 PMdef run(self) -> None:
you'll get a mismatch error there. I'm not sure why we don't have it take Any
and return Any
but even then I think that pycharm may complain.Danny Vilela
02/04/2021, 11:40 PMMichael Adkins
02/05/2021, 12:25 AM