Surawut Jirasaktavee
02/05/2023, 7:47 AMSurawut Jirasaktavee
02/05/2023, 7:49 AM╭─ │ ~/Gi/Data-/course-data-engineering-zoomcamp │ main
╰─ prefect agent start -q default ─╯
Starting v2.7.7 agent with ephemeral API...
___ ___ ___ ___ ___ ___ _____ _ ___ ___ _ _ _____
| _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _|
| _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | |
|_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_|
Agent started! Looking for work from queue(s): default...
14:45:09.182 | INFO | prefect.agent - Submitting flow run '850e6e36-f081-4733-b333-1142e9c3f820'
14:45:09.357 | INFO | prefect.infrastructure.docker-container - Pulling image 'premdocker2022/prefect-flow:v1'...
14:45:14.980 | INFO | prefect.infrastructure.docker-container - Creating Docker container 'shrewd-crow'...
14:45:15.073 | INFO | prefect.infrastructure.docker-container - Docker container 'shrewd-crow' has status 'created'
14:45:16.078 | INFO | prefect.agent - Completed submission of flow run '850e6e36-f081-4733-b333-1142e9c3f820'
14:45:16.089 | INFO | prefect.infrastructure.docker-container - Docker container 'shrewd-crow' has status 'running'
<jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
<jemalloc>: (This is the expected behaviour if you are running under QEMU)
/usr/local/lib/python3.9/runpy.py:127: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
warn(RuntimeWarning(msg))
07:45:34.877 | INFO | Flow run 'shrewd-crow' - Downloading flow code from storage at ''
07:45:43.445 | INFO | Flow run 'shrewd-crow' - Created subflow run 'natural-otter' for flow 'etl-web-to-gcs'
07:45:43.557 | INFO | Flow run 'natural-otter' - Created task run 'fetch-ba00c645-0' for task 'fetch'
07:45:43.562 | INFO | Flow run 'natural-otter' - Executing 'fetch-ba00c645-0' immediately...
07:45:44.404 | ERROR | Task run 'fetch-ba00c645-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1445, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "week_2/src/deployment/etl_web_to_gcs_parametize_4.py", line 17, in fetch
df = pd.read_csv(dataset_url) # pylint: disable=invalid-name
File "/usr/local/lib/python3.9/site-packages/pandas/util/_decorators.py", line 211, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/pandas/util/_decorators.py", line 331, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 950, in read_csv
return _read(filepath_or_buffer, kwds)
File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 605, in _read
parser = TextFileReader(filepath_or_buffer, **kwds)
File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 1442, in __init__
self._engine = self._make_engine(f, self.engine)
File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 1735, in _make_engine
self.handles = get_handle(
File "/usr/local/lib/python3.9/site-packages/pandas/io/common.py", line 713, in get_handle
ioargs = _get_filepath_or_buffer(
File "/usr/local/lib/python3.9/site-packages/pandas/io/common.py", line 363, in _get_filepath_or_buffer
with urlopen(req_info) as req:
File "/usr/local/lib/python3.9/site-packages/pandas/io/common.py", line 265, in urlopen
return urllib.request.urlopen(*args, **kwargs)
File "/usr/local/lib/python3.9/urllib/request.py", line 214, in urlopen
return opener.open(url, data, timeout)
File "/usr/local/lib/python3.9/urllib/request.py", line 523, in open
response = meth(req, response)
File "/usr/local/lib/python3.9/urllib/request.py", line 632, in http_response
response = self.parent.error(
File "/usr/local/lib/python3.9/urllib/request.py", line 561, in error
return self._call_chain(*args)
File "/usr/local/lib/python3.9/urllib/request.py", line 494, in _call_chain
result = func(*args)
File "/usr/local/lib/python3.9/urllib/request.py", line 641, in http_error_default
raise HTTPError(req.full_url, code, msg, hdrs, fp)
urllib.error.HTTPError: HTTP Error 404: Not Found
07:45:44.467 | ERROR | Task run 'fetch-ba00c645-0' - Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1445, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "week_2/src/deployment/etl_web_to_gcs_parametize_4.py", line 17, in fetch
df = pd.read_csv(dataset_url) # pylint: disable=invalid-name
File "/usr/local/lib/python3.9/site-packages/pandas/util/_decorators.py", line 211, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/pandas/util/_decorators.py", line 331, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 950, in read_csv
return _read(filepath_or_buffer, kwds)
File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 605, in _read
parser = TextFileReader(filepath_or_buffer, **kwds)
File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 1442, in __init__
self._engine = self._make_engine(f, self.engine)
File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 1735, in _make_engine
self.handles = get_handle(
File "/usr/local/lib/python3.9/site-packages/pandas/io/common.py", line 713, in get_handle
ioargs = _get_filepath_or_buffer(
File "/usr/local/lib/python3.9/site-packages/pandas/io/common.py", line 363, in _get_filepath_or_buffer
with urlopen(req_info) as req:
File "/usr/local/lib/python3.9/site-packages/pandas/io/common.py", line 265, in urlopen
return urllib.request.urlopen(*args, **kwargs)
File "/usr/local/lib/python3.9/urllib/request.py", line 214, in urlopen
return opener.open(url, data, timeout)
File "/usr/local/lib/python3.9/urllib/request.py", line 523, in open
response = meth(req, response)
File "/usr/local/lib/python3.9/urllib/request.py", line 632, in http_response
response = self.parent.error(
File "/usr/local/lib/python3.9/urllib/request.py", line 561, in error
return self._call_chain(*args)
File "/usr/local/lib/python3.9/urllib/request.py", line 494, in _call_chain
result = func(*args)
File "/usr/local/lib/python3.9/urllib/request.py", line 641, in http_error_default
raise HTTPError(req.full_url, code, msg, hdrs, fp)
urllib.error.HTTPError: HTTP Error 404: Not Found
During handling of the above exception, another exception occurred:
Surawut Jirasaktavee
02/05/2023, 7:50 AMTypeError: cannot pickle '_io.BufferedReader' object
07:45:44.552 | ERROR | Flow run 'natural-otter' - Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 636, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 135, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "week_2/src/deployment/etl_web_to_gcs_parametize_4.py", line 58, in etl_web_to_gcs
df = fetch(dataset_url) # pylint: disable=invalid-name
File "/usr/local/lib/python3.9/site-packages/prefect/tasks.py", line 436, in __call__
return enter_task_run_engine(
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 926, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/usr/local/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1064, in get_task_call_return_value
return await future._result()
File "/usr/local/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/usr/local/lib/python3.9/site-packages/prefect/states.py", line 89, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.9/site-packages/prefect/task_runners.py", line 207, in submit
result = await call()
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1303, in begin_task_run
state = await orchestrate_task_run(
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1464, in orchestrate_task_run
terminal_state = await exception_to_failed_state(
File "/usr/local/lib/python3.9/site-packages/prefect/states.py", line 187, in exception_to_failed_state
data = await result_factory.create_result(exc)
File "/usr/local/lib/python3.9/site-packages/prefect/results.py", line 317, in create_result
return await PersistedResult.create(
File "/usr/local/lib/python3.9/site-packages/prefect/results.py", line 449, in create
data = serializer.dumps(obj)
File "/usr/local/lib/python3.9/site-packages/prefect/serializers.py", line 142, in dumps
blob = pickler.dumps(obj)
File "/usr/local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_io.BufferedReader' object
07:45:44.673 | ERROR | Flow run 'natural-otter' - Finished in state Failed("Flow run encountered an exception. TypeError: cannot pickle '_io.BufferedReader' object\n")
07:45:44.710 | ERROR | Flow run 'shrewd-crow' - Finished in state Failed('1/1 states failed.')
Flow is not complete...
14:45:46.027 | INFO | prefect.infrastructure.docker-container - Docker container 'shrewd-crow' has status 'removing'
14:45:46.036 | INFO | prefect.infrastructure.docker-container - Docker container 'shrewd-crow' has status 'removing'
Surawut Jirasaktavee
02/05/2023, 7:50 AMFROM prefecthq/prefect:2.7.7-python3.9
COPY docker-requirements.txt .
RUN pip install -r docker-requirements.txt --trusted-host <http://pypi.python.org|pypi.python.org> --no-cache-dir
ENV PREFECT_API_URL=<http://host.docker.internal:4200/api>
RUN prefect config set PREFECT_API_URL="${PREFECT_API_URL}"
RUN mkdir -p /opt/prefect/data/green
RUN mkdir -p /opt/prefect/deployment
COPY deployment /opt/prefect/deployment
Surawut Jirasaktavee
02/05/2023, 7:51 AM# type: ignore
# pylint: disable=missing-module-docstring
from etl_web_to_gcs_parametize_4 import etl_parent_flow # pylint: disable=import-error
from prefect.deployments import Deployment # pylint: disable=import-error
from prefect.infrastructure.docker import DockerContainer # pylint: disable=import-error, ungrouped-imports
from prefect.filesystems import GitHub # pylint: disable=import-error
github_block = GitHub.load("github-prefect-flow")
docker_container_block = DockerContainer.load("container-prefect-flow")
docker_dep = Deployment.build_from_flow(
flow=etl_parent_flow,
name="docker-flow",
version="1",
description="load data from web and save it to gcs",
tags=["docker-infra", "github-storage", "web-to-gcs"],
storage=github_block,
infrastructure=docker_container_block,
work_queue_name="default",
path="",
entrypoint="week_2/src/deployment/etl_web_to_gcs_parametize_4.py:etl_parent_flow",
parameters={"months": [11], "year": 2022, "color": "green"},
output="./week_2/src/deployment/prefect-docker-deployment-web-to-gcs-green.yaml",
)
if __name__ == "__main__":
docker_dep.apply()
Surawut Jirasaktavee
02/05/2023, 7:52 AMetl_parent_flow
it works well if I run it locally.